Spark-like datasets in Unison

Incremental evaluation via memoization

by
  • Rebecca Mark
  • Paul Chiusano

Let's imagine the following scenario: you're MegaCorp's Chief WordCount Scientist. Every day you receive a file containing a list of all Shakespeare quotations found when crawling the internet, each paired with the url where it was spotted. For instance, this article quotes the line "Shall I compare thee to a summer's day?"... right now, in the sentence you are currently reading.

You're charged with computing various statistics about this dataset (say, a histogram for the top 10 Shakespeare quotes), and enabling MegaCorp's Business Intelligence team to answer Very Important Questions ("does Shakespeare get quoted more often on Twitter, reddit, the NY Times, or in YouTube comment threads?").

Now, since there isn't any new Shakespeare being written (he is dead after all), and people admittedly aren't dropping Shakespeare quotes all that often in new content posted to the internet, it's likely much of the dataset will be the same from one day to the next. It would be nice to not have to process it from scratch every time if not much has changed.

More generally, we'd prefer it if incremental changes to a dataset required incremental computation to produce an answer. We don't want to rerun an entire batch job just because one or two new pieces of data filtered in! In this part, we'll show a nice solution to this problem for computations defined on the lib.Tree type we've developed so far.

This issue also arises for batch jobs that run on what is effectively an "append-only" dataset, like a collection of log files.

While it's possible to rewrite such batch jobs in a way that takes advantage of the append-only nature of a dataset, this can require awkward rephrasing of the computation, not to mention serialization boilerplate.

A good solution for computing results incrementally can be a huge deal for larger batch jobs: the difference between a job that finishes in seconds and gets run and refined interactively vs a job that's run overnight. In principle, incremental evaluation "just" requires caching deterministic computations whose results you've computed before.

Doing this robustly and accurately requires a way of uniquely identifying computations that will change whenever the computation or any of its dependencies change.

In Unison, we'll use the hash of a computation as its identity. The hash of a function incorporates the hashes of all of its dependencies and will change if any of its dependencies change.

Our lib.Tree is nicely set up to take advantage of caching. Imagine we're computing a reduce of the tree. If any subtree of the reduction has been seen before (if it has the same hash), we can just return the reduced result immediately from the cache rather than descending into that subtree.

The same idea also applies to, say, a lib.Tree.map. Suppose we have lib.Tree.map expensiveFn t. If expensiveFn has already been applied at some subtree and cached, the resulting transformed tree can be retrieved from the cache.

Fortunately, Remote.Value includes a function for memoizing remote values, caching them at the location where the data resides, and we can use it to implement a "memoized reduce" and other operations:

Value.memo loc v returns a new Remote.Value that intercept calls to Value.get on v. Such calls first consult a cache, keyed by the hash of v. If a result is found in the cache, it's returned immediately. Otherwise it forces v and inserts the result in the cache for next time.

The caching happens at the location of v, and if that location doesn't support this capability, another Location within loc will be consulted as a fallback. The exact ability required here is Scratch, an abstract API for ephemeral local caching (using a combination of RAM and/or disk), but the basic logic of Value.memo would look similar when the cache provider is some other ability besides Scratch.

📓
The Location type is used throughout the Remote ability to represent "places where data can live or computations can happen" in an abstract way. It will be mapped to, say, a hostname and port by a Remote handler that does actual distributed execution.

Let's look at an implementation of reduce that produces memoized results. Such an implementation will support efficient incremental recomputation when the input changes slightly, because the results of subtrees will be cached.

memo1.Tree.reduce : Location {Scratch, g}
                    -> a
                    -> (a -> '{Remote} a ->{Remote} a)
                    -> lib.Tree a
                    ->{Remote} a
memo1.Tree.reduce :
  Location {Scratch, g}
  -> a
  -> (a -> '{Remote} a ->{Remote} a)
  -> lib.Tree a
  ->{Remote} a
memo1.Tree.reduce scratch a combine = cases
  lib.Tree.Empty -> a
  Tree.One valueA -> Value.get (Value.memo scratch valueA)
  Tree.Two l r ->
    use Remote fork
    use Value get map memo
    use memo1.Tree reduce
    lValue =
      memo
        scratch
        (map (leftTree -> reduce scratch a combine leftTree) l)
    rValue =
      memo
        scratch
        (map (rightTree -> reduce scratch a combine rightTree) r)
    l' = fork here! do get lValue
    r' = fork here! do get rValue
    combine (await l') do await r'

The call to Value.memo happens when we handle the Tree.One and Tree.Two cases. So this reduce function caches the values at the leaf nodes, but it also caches the computation of each branch in the tree by wrapping the calls to Value.map in Value.memo.

When run a second time, recursion into subtrees will be cut off whenever a subtree has been previously computed and resides in the cache.

This reduce function also requires that a Location argument be provided now that we're calling Value.memo which requires.

An exercise: Memoize the tree
An exercise: Memoize the tree
📓

Exercise: Implement a Tree.memo function

Write a Tree.memo function which at every level memoizes the evaluation of a lib.Tree.

Show me the answer
Show me the answer
answers.Tree.memo : Location {Scratch} -> lib.Tree a -> lib.Tree a
answers.Tree.memo location = cases
  lib.Tree.Empty -> lib.Tree.Empty
  Tree.One valueA -> Tree.One (Value.memo location valueA)
  Tree.Two l r ->
    Tree.Two
      (Value.memo location (Value.map (answers.Tree.memo location) l))
      (Value.memo location (Value.map (answers.Tree.memo location) r))

After a particularily expensive computation runs on the lib.Tree, one thing you might do to speed up subsequent computations is call the answers.Tree.memo function written above as a caching strategy. In addition to caching already run data in between jobs, you might use answers.Tree.memo between transformations in a data pipeline so sub-stages of the pipeline don't need to re-compute data.

Try your own map-reduce
Try your own map-reduce
📓

Exercise: Write a test program using the pure interpreter

This exercise is up to the reader 😎. See if you can use the functions we've written on lib.Tree, to write a simple map reduce program. run.pure is an in memory interpreter for for the Remote ability.

The use of Value.memo provides clear efficiency gains: cached data means faster jobs. But we also are saving on engineering costs: we don't need to contort our code or deal with manual serialization of cached state, nor do we need additional infrastructure beyond the general infrastructure used to run Remote computations.

Conclusions

Phew! We've covered a lot of ground in this article. Here's a quick recap:

  • We got a preview of the distributed Seq type and how it enables Spark-like distributed computations. The type is just a few lines of code but lets us nicely express many distributed operations like Seq.map, Seq.reduce, and lots more.
  • We learned the general method for making any immutable data structure distributed, and how to implement functions like map and reduce in a way that "brings the computation to the data".
  • We showed how computations over distributed datasets could be memoized to enable efficient incremental evaluation.

We hope you enjoyed this article. It's the first of a series on compositional distributed systems, each showing how powerful libraries for distributed computing can be assembled from reusable components in a tiny amount of Unison code. The examples shown in this series aren't big frameworks siloed from your "regular" code. These are just ordinary libraries that are a function call away, and that can be combined with the aid of a typechecker rather than a mess of glue and duct tape.

Distributed programming can be fun and approachable. While there are new things to think about ("where do I want the data to live and the computation to happen?"), Unison helps you say exactly what you mean with a minimum of ceremony, letting you focus on the interesting parts and not on the tedious particulars of simply moving data and computations from one place to another.

We'll cover some additional topics in the last section. Read on to learn about approaches to error handling, smarter chunking and granularity of parallelism, and more.

Got questions or comments? Feel free to open a discussion tagged 'distributed-api' here or chat with us in the #distributed channel of the Unison Slack.

If you're interested in using Unison at work for this sort of thing, we'd love to chat with you.