Here we'll cover some additional topics you may have wondered about while reading the article:
- What about error handling?
Seq
vslib.Tree
and controlling the granularity of parallelism- How do I create a distributed sequence?
Error handling
While the Remote
ability does support fine-grained error handling via functions like Remote.try
and tryAwait
, for batch computing workloads like the one we've developed here, coarse-grained error handling policies applied to an entire computation are often appropriate. Some example policies:
- In the event of a failure, retry repeatedly (possibly with exponential backoff). If the failure keeps happening, a human will eventually intervene to kill the job.
- In the event of a failure, retry up to 4 times, then reraise the failure.
These sorts of general policies can be provided by "middleware" handlers of Remote
rather than needing to pollute nice implementations like Seq.reduce
with retry logic.
Tree
vs Seq
and controlling granularity of parallelism
The lib.Tree
we used in the tutorial is a bit different than the Seq
used in the distributed library. This is subtle stuff. Let's take a look at the differences:
structural type lib.Tree a
structural type lib.Tree a
= articles.distributedDatasets.lib.Tree.One (Remote.Value a)
| articles.distributedDatasets.lib.Tree.Two
(Remote.Value (lib.Tree a)) (Remote.Value (lib.Tree a))
| articles.distributedDatasets.lib.Tree.Empty
structural type Seq k a
structural type Seq k a
= lib.dist_extra.Seq.Seq (Remote.Value (Two Mode a (Seq k a)))
structural type Two m a r
structural type Two m a r
= lib.dist_extra.structures.Two.Empty
| lib.dist_extra.structures.Two.Two m r r
| lib.dist_extra.structures.Two.One a
What's going on here? Well, first, ignore that Seq
has an extra k
parameter. Also ignore for now the Mode
parameter for controlling granularity of parallelism—we'll discuss that shortly.
More substantially, Seq
is defined via mutual recursion between two types, Seq
and Two
. This is a little brain bending if you haven't seen this sort of thing before, but the usage here just enforces that there is a Remote.Value
wrapping each level of the tree, including the root. Recall that Remote.Value
is lazy, so this means we can't know if a Seq
is empty, a leaf or a branch without first forcing the Remote.Value
.
It's instructive to compare how the two types represent a few example trees:
use Value pure
emptyTree = lib.Tree.Empty
emptySeq = Seq (pure Two.Empty)
leafTree = Tree.One (pure "A")
leafSeq = Seq (pure (Two.One "A"))
branchTree = Tree.Two (pure leafTree) (pure leafTree)
branchSeq leafSeq mode = Seq (pure (Two.Two mode leafSeq leafSeq))
lib.Tree
isn't fully lazy. Without looking inside any Remote.Value
, we can tell whether the tree is empty, a leaf, or a brach. The laziness only kicks in when we try to see what is inside a leaf, or what subtrees are one level down from a Tree.Two
.
In contrast, Seq
is fully lazy: we can't obtain any information about its structure without calling either Value.map
or Value.get
. We can't even tell if the data structure is empty or not! We can tell that it is a Seq
, but any information about its internal structure is guarded by a Remote.Value
.
structural type lib.Tree a
structural type lib.Tree a
= articles.distributedDatasets.lib.Tree.One (Remote.Value a)
| articles.distributedDatasets.lib.Tree.Two
(Remote.Value (lib.Tree a)) (Remote.Value (lib.Tree a))
| articles.distributedDatasets.lib.Tree.Empty
structural type Seq k a
structural type Seq k a
= lib.dist_extra.Seq.Seq (Remote.Value (Two Mode a (Seq k a)))
structural type Two m a r
structural type Two m a r
= lib.dist_extra.structures.Two.Empty
| lib.dist_extra.structures.Two.Two m r r
| lib.dist_extra.structures.Two.One a
For an ordinary data in-memory data structure, the difference in these two phrasings might not matter much in practice. For distributed data structures, it's a more substantive decision. Consider the function Seq.flatMap
, and imagine calling it with a function that does some serious work:
Seq.flatMap :
(a ->{Exception, Remote} Seq Defer b) -> Seq k a -> Seq Defer b
Seq.flatMap f s =
use Seq unwrap
use Two Empty
use Value pure
go :
Two Mode a (Seq k a) -> Remote.Value (Two Mode b (Seq Defer b))
go = cases
Empty -> pure Empty
Two.One a -> unwrap (f a)
Two.Two mode l r ->
pure (Two.Two mode (Seq.flatMap f l) (Seq.flatMap f r))
Seq (Value.flatMap go (unwrap s))
The above implementation is nicely lazy and does no work until the sequence is later forced (by a Seq.reduce
say). In contrast, imagine writing flatMap
for lib.Tree
. If the tree happens to be a leaf it will have to strictly apply the function, which means that flatMap
now requires access to Remote
.
This is a bit of an awkward programming model, where some computation is run strictly some of the time, while other computations are evaluated lazily only when needed, and the difference depends on the size of the tree! Seq
has a more uniform representation that ensures no work ever happens until the Seq
is forced by a function like Seq.reduce
, and this is often a good choice.
Controlling chunking and the granularity of parallelism
Let's look at the implementation of parallel.Tree.reduce
:
parallel.Tree.reduce : a
-> (a ->{Remote} a ->{Remote} a)
-> lib.Tree a
->{Remote} a
parallel.Tree.reduce :
a -> (a ->{Remote} a ->{Remote} a) -> lib.Tree a ->{Remote} a
parallel.Tree.reduce zero combine = cases
lib.Tree.Empty -> zero
Tree.One valueA -> Value.get valueA
Tree.Two leftValue rightValue ->
use Remote fork
use Value get map
use parallel.Tree reduce
leftTask =
fork here! do get (map (t -> reduce zero combine t) leftValue)
rightTask =
fork here! do get (map (t -> reduce zero combine t) rightValue)
left' = await leftTask
right' = await rightTask
combine left' right'
This does a parallel reduce of the tree, forking threads at each Tree.Two
. This is fine if each subtree represents a large chunk of work. But for say a Tree Nat
, close to the leaves of such a tree, there's so little data that the overhead of forking a thread to process may not pay for itself, even with Unison's lightweight threads.
The Seq
type allows branches to be annotated with a Mode
indicating whether they should be Parallel
or Sequential
. Functions like Seq.reduce
respect the annotation and only fork threads if annotation indicates it's worthwhile, and functions like Seq.map
leave the annotation alone:
Seq.reduce : m -> (m -> m ->{Remote} m) -> Seq k m ->{Remote} m
Seq.reduce : m -> (m -> m ->{Remote} m) -> Seq k m ->{Remote} m
Seq.reduce z op s =
use Seq reduce
use Two Two
use distributed_6_0_0.Remote await fork
go = cases
Two.Empty -> z
Two.One a -> a
Two Sequential l r -> op (reduce z op l) (reduce z op r)
Two Parallel l r ->
tl = fork here! do reduce z op l
tr = fork here! do reduce z op r
op (await tl) (await tr)
Value.get (Value.map go (Seq.unwrap s))
When building up a distributed sequence, you can control the granularity of parallelism by picking Parallel
or Sequential
for constructed Two.Two
nodes, and functions like fromChunkedList
will do this automatically. We'll talk more about constructing sequences in the next section.
In addition to using the Mode
annotation, you can also work with trees whose leaf values are some chunk type. So rather than working with a Tree Nat
, you could instead work with Tree [Nat]
or use some more specialized chunk type. This sort of explicit chunking isn't as nice to program with but it can offer better performance.
Creating distributed sequences
It's nice that we can write functions like Seq.map
that preserve the existing partitioning of the data onto multiple nodes. But how do we actually create a distributed sequence in the first place? This section gives the basic idea of how to construct distributed sequences in terms of the Location
type used within Remote
.
Distributed sequences will typically be created lazily, by repeatedly splitting some state in half and producing a leaf or the empty once the state hits some base case. For instance, here's a function that produces a Seq
from a list, placing every chunkSize
subtree together at a single location:
fromListAt : Location g -> Nat -> [a] -> Seq k a
fromListAt : Location g -> Nat -> [a] -> Seq k a
fromListAt region chunkSize as =
use List halve
use Nat * <= >=
use Two Two
step isRoot count as =
if List.size as <= chunkSize then
distributed_6_0_0.Value.at
region (Seq.unwrap (Seq.fromList Sequential as))
|> Value.join
else
if count >= chunkSize
|| isRoot then
distributed_6_0_0.Value.delayAt region do
(l, r) = halve as
Two Parallel (Seq (step false 2 l)) (Seq (step false 2 r))
else
(l, r) = halve as
count' = count * 2
Value.pure
(Two
Parallel
(Seq (step false count' l))
(Seq (step false count' r)))
Seq (step true 1 as)
As a sample use case, we might call this with a list of urls or file names from S3. A subsequent Seq.map
or Seq.flatMap
can then be used to fetch (or "hydrate") the contents of those urls. The loading happens lazily, only when the resulting Seq
is forced.
If the sequence is so large that not even the names of all the files can fit in memory, we might recursively build the sequence using fromListAt
(to list top level directory names) and Seq.flatMap
(to recursively build a sequence for the files under each directory).
We can also use more general combinators like Seq.unfold
or skewUnfoldAt
.