## MapReduce I

The paradigm of MapReduce, initially publicized by Google, is described in many sources. For presentations and reports, here are the primary sources:

OSDI'04 Paper: the page points to both PDF of the paper and slides that were used in the talk. You can copy some of the figures from these slides, but you should describe the paradigm and implementation in your own way. An alternative citation for this work is the CACM Article (readable from a university internet address).

Step Back: this page is a criticism of the paradigm from the view of a leading researcher in the database community, and has many pointers and citations for background. To fully appreciate the points for and against, some knowledge of how database systems function is assumed.

There are many additional articles, but for the first presentation/report, don't go into all of the imaginative ways that MapReduce might be used -- that will be part of another presentation, described below.

Update please see this /Background page for a general idea about what MapReduce is.

## MapReduce II

Supposing that the presentation/report on MapReduce above explains the programming model and shows a few examples, the topic for this part is to show more deeply how the paradigm can be used, what are considerations for performance, fault tolerance, databases, and application-specific usages. Here are some sources for this:

PACT'06 paper: an invited talk, with slides. This gives a broad picture of how MapReduce has been used at Google, with some performance numbers.

HPCA'07 and the movie is a more in-depth study of the performance and alternatives. Within this study, you can see different benchmarks and programming tasks that could use MapReduce.

Other usages of MapReduce can be found in the Wikipedia section on papers, including implementation on graphics processors and for relational data models.

## MapReduce III

Let's consider the theory and software patterns behind MapReduce. One thing to look at is a catalog of the basic ideas behind parallel programming, which includes such topics as "master/worker" patterns, data-parallel computing, pipelining, and systolic arrays. Actually, there's quite a lot to study, and general conclusions become weak when the hardware architecture grows more specialized. If we look at the introductory exercises for MapReduce beginners, the list is more or less the following:

- From a list of words, output the number of times each word occurs.
- From a list of words, output those words than occur uniquely.
- Sort a terabyte of records.
- Given a set of webpages, construct a list, per URL, of the webpages that refer to that URL.
- Calculate per employee taxes where the input is a set of (empl,sal,mon) tuples.

This list is merely a starting point. What else can MapReduce do? Maybe the better question is what can it *not* do? If multiple uses of MapReduce are allowed, perhaps in some larger iteration, then even graph problems can be solved, for instance, MapReduce Dijkstra.

The starting point to better understand MapReduce goes back to looking at functional programming - see the /Background page. Then, it helps to understand the MapReduce "artifact", that is, the specific system first built at Google, and later available through Hadoop and other commercial implementations (such as Greenplum, Aster, etc.). The Google paper on MapReduce gets into the fault tolerance and data replication aspects, which follow from using GFS and the commodity hardware platform. What decisions about the design of MapReduce owe mainly to GFS, and what decisions are more fundamental to parallel programming?

Think about the part of MapReduce beyond the Map and Reduce functions. The user also supplies a partitioning function, to say how the shuffle/sort phase works -- this turns out to be an essential feature for some jobs. What about sorting: is this always necessary? For some jobs, the sorting isn't needed; yet it is a potential bottleneck to parallel processing. Can MapReduce be tweaked further to allow better performance?

Some examples found in these Google Lectures can help us understand better how MapReduce is used.

A more practical story of using MapReduce is here.

## MapReduce IV

So you have a problem with a large input dataset and you would like to process the data using parallel processors, using the MapReduce paradigm. Where do you start, and how do you come up with the map and reduce functions? Here's a rough idea of the design steps (but not necessarily in order):

Choose some representation of your problem, and the phases of MapReduce data flow going from map to reduce, and reduce output, in terms of tuples with keys and values.

- Devise an appropriate map function.
(Optional step for Google's MapReduce; may not be appropriate for other MapReduce implementations) - specify a "partition" method that decides where output tuples of the map step should be place for subsequent reduce processing. The default is a hash on the key value. Also, decide how many reducers you would like to use.

- Devise an appropriate reduce function.

Carrying out the steps above may not be easy or obvious. A useful **design hint** for MapReduce programming is to start from the perspective of the final output, and then reason backwards. What should the final output look like? Once you know that, can you figure out what input reducers should have in order to create such an output? If you have the answer to that, you may be able to decide on what map functions need to do.

Example: Matrix Multiplication. Suppose we have two matrices A and B, and we want to use MapReduce to calculate the product C = A*B. For purposes of illustration, we have a trivial example of 3x3 matrices:

- first, here is matrix A: (the entries with "?" should be numeric values, but we'll just concentrate on the second row of the matrix, so only the values of the second row are shown)
?

?

?

1

2

3

?

?

?

- second, here is matrix B: (again, we just focus on some entries, in this case the first column only)
4

?

?

5

?

?

6

?

?

- the result matrix C is: (only C[1][0] is shown because all we know for sure is the second row of A and the first column of B; 32 = 1*4 + 2*5 + 3*6)
?

?

?

32

?

?

?

?

?

Let us now work on the design of reduce and the problem of choosing an appropriate representation of data flowing from map to reduce. In particular, think about what reduce would need to output "32" as part of the result matrix C. Here are the things needed to calculate C[1][0]:

- the second row of A, namely A[1][0], A[1][1], and A[1][2]
- the first column of B, which is B[0][0], B[1][0], and B[2][0]

In class, there were suggestions of tuples that a reducer might receive in order to get all these items. There might be a list of tuples such as:

( (1,0,1,0,"A"), 1 ) ( (1,0,0,0,"B"), 4 ) ( (1,0,1,1,"A"), 2 ) ( (1,0,1,0,"B"), 5 ) ( (1,0,1,2,"A"), 3 ) ( (1,0,2,0,"B"), 6 )

Explanation:

- the key is a tuple itself is a 5-tuple
- the first two numbers in the key identify the row and column of C
- the second pair of numbers in the key identify the source row and column ( A or B )
- the last thing in the key is the name of the source matrix

While this idea could work, notice that the order of the keys listed above isn't sorted correctly -- assuming keys are sorted lexicographically. Of course, starting the key with (1,0) is important so that the reduce gets all the items for C[1][0] grouped together, which is what the partition/sort phase of MapReduce does for us automatically. But after that, do we really need this other stuff? Recall again, what's needed to calculate:

`1*4 + 2*5 + 3*6`

Notice that if the values arrive in the order 1,4,2,5,3,6 -- then reduce can be easily programmed without having to keep track of entire rows or columns. It will be a simple sum, with pseudo code such as:

// suppose "n" is known, that is, // the value 3 is known because matrices are 3x3 int Sum = 0; int c = 0; // # tuples processed so far int p = -1; // value from previous tuple void reduce(k,v) { if (p==-1) { p = v; c++; return; } Sum += p*v; p = -1; c++; if (c==6) emit(k,Sum); return; }

(actual programming of reduce would be more sophisticated, of course)

So the question now is, how can we make sure that values arrive in the order 1,4,2,5,3,6? Here is a simple idea. What if the tuples arriving to reduce are:

( (1,0,0), 1 ) ( (1,0,0), 4 ) ( (1,0,1), 2 ) ( (1,0,1), 5 ) ( (1,0,2), 3 ) ( (1,0,2), 6 )

The only thing wrong with this suggestion is that the sort might mix up 1 and 4, or 2 and 5; the arrival order might be something like 4,1,2,5,6,3 because some keys are duplicated in the arriving tuples. But, fortunately, this doesn't matter to the calculation! (Because multiplication is commutative.)

Now we have some idea about reduce. In fact, a reducer might get all the tuples for C[1][0], and all the tuples for C[2,1]. That would mean more programming, but in principe it's not difficult to deal with, because each group of tuples that have the same two numbers starting the key will be for one position in the output matrix, hence reduce ought to be able to detect this and do the right thing.

What about map? Let's consider what the map does for matrix A. If matrix A is input in row order, then the items of the second row will be seen as a sequence. The map function will need to do the following, at least on account of C[1][0]'s calculation:

A[1][0] --> emit( ((1,0,0),val) ) A[1][1] --> emit( ((1,0,1),val) ) A[1][2] --> emit( ((1,0,2),val) )

(obviously there would be more statements, and "val" would be assigned from the input, and all of these things would be generated in a loop)

Notice that the second index for A[][] determines the third value of the emitted tuple's key, which is convenient. Similar reasoning will find the pattern for processing the input matrix B.

One important thing missing from the story is how other elements of the output matrix C would be woven into the above. Let's consider what has to happen when A[1][0] is read by the map function. Not only does map need to emit a tuple on account of C[1][0], but it also has to emit a tuple for C[1][1] and C[1][2]. So, actually we will need to have something like:

A[1][0] --> emit( ((1,0,0),val) ) // for C[1][0] emit( ((1,1,0),val) ) // for C[1][1] emit( ((1,2,0),val) ) // for C[1][2]

There's more to do in order to get this all working, but at least we have a start for the design of the program. Notice how the reasoning was backward, starting with reduce, then figuring out what map would need to do.

## MapReduce V

The previous example on matrix multiplication wasn't very specific about the *input* to the map function. The typical way that map is defined for Google is that the function signature is something like

map(key,value)

where "key" is usually a string and value symbolically can represent some large amount of data. In class, it was suggested that a more flexible way to specify map would be

map(key,generator)

where **generator** could be a function that map's program can call to get bytes or fields of input data. For the matrix multiply problem, here would be a reasonable map function (not exactly fitting either of the two forms above):

map((AFile,BFile))

Here, the map function doesn't have a "value" or a generator. Instead, there is only a key, which is a pair of file descriptors or filenames. The reason there is no generator is that map is being programmed in the context or Unix or Windows, where there is a standard way to read from files. So, in pseudo-code, the program for map might roughly look like the following:

1 map((AFile,BFile)): 2 A = open(AFile) 3 // suppose it is known that each matrix is N by N, and that 4 // each file lists items of the matrix in row order, with row zero 5 // first and row N-1 last 6 for i in 0..(N-1): 7 for j in 0..(N-1): 8 x = read(A) // reads A[i][j] 9 for k in 0..(N-1): 10 Emit(?) 11 close(A) 12 B = open(BFile) 13 // need to "distribute B" much like was done for A 14 for i in 0..(N-1): 15 for j in 0..(N-1): 16 y = read(B) // reads B[i][j] 17 for k in 0..(N-1): 18 Emit(?) 19 close(B)

- notice that emit statements are incomplete (the ? part) -- this is an exercise to find out what should be in these statements
notice that for each element of an input matrix, there are N emit statements done --- this makes MapReduce work harder, because it has many more than just N*N tuples to process in the intermediate, shuffle-and-sort phase

- most systems allow programs to close a file, then open it again, starting from the beginning; can we devise some way that map could open, close, and reopen the file for matrix B many times (maybe N times) so that the tuples sent to the reducer have a different form, where already the multiplication of corresponding A and B elements was done in the map phase?
**Note:**the largest defect in the code sketch above is that is that it seems to consider only*one mapper*, that is, as though only one thread or one machine is processing the input files. In practice, we want to have many mappers running in parallel. This means that the mappers should split up the input files for A and B into parts. This would be possible, provided that the disk addresses for different ranges of elements within A and B can be calculated and are passed as arguments to map().

## MapReduce VI

What if matrices A and B are very large, say N = 10,000. Then each matrix will have 10^8 elements, and the number of tuples generated by the approach given above would be 2e12 (that is, tuples for N different A items and tuples for N different B items, for each element of C). Depending on the size of the cluster's disk storage capacity, this might not be a feasible approach. In such a case, we might try a different idea, namely to arrange for the reduce tasks to do only additions, and arrange for the map tasks to perform the multiplications. For example, the reducer for C[1][0] could receive just three tuples:

( (1,0), 4 ) // 1*4 ( (1,0), 10 ) // 2*5 ( (1,0,), 18 ) // 3*6

For this to occur, we would need to have the matrix B read multiple times, so that each element of B somehow gets to be available when the corresponding element of A is already in memory. In other words, we might be able to read matrix A once, but matrix B would be read numerous times. Why? There are two answers.

- From the viewpoint of doing this all with a single mapper task, that mapper may not have enough memory to hold all of A and all of B in memory. Therefore reading B multiple times seems to be necessary.
- Suppose we have N mappers, maybe one mapper for each row of A. Then, for each row of A, we need to arrange that all columns of B are read in order to match up corresponding items to compute C. If a mapper can hold one row of A in memory, then it is enough to read B just one time: the mapper can actually calculate the dot products of A-rows and B-columns directly, and the reducer will just a trivial because the mapper does all the work. But if the data of an A-row is too big for memory, then B might need to be read multiple times to produce the desired tuples for the reducers.

## Exercises

Read about database "join" operations, either from a formal perspective (say Join (relational algebra)) or from database definitions (say Join (SQL)). Can you program a join using MapReduce? (There are many kinds of join, so just pick a simple one like a natural join on equality for a single key.)

Suppose a graph or a relation is given in tuple form: the input will have tuples (v,w) and (w,v) if there is an edge between v and w; or, if we are thinking about a relation, then v is related to w. The exercise is to take the input set of tuples

{ (v,w) | v is related to w }

and produce a new set of tuples,

{ (k,v) | v belongs to equivalence class k }

where two elements (a,b) belong to equivalence class k if they are transitively related, or in graph terminology, there is a path from a to b. If a and b are not transitively related, they belong to different equivalence classes and so we cannot have (k,a) and (k,b) in the output -- instead we will have (k,a) and (k',b), where k is different from k'. The actual values naming the equivalence class can be anything you like.A list of numbers is in an input file. The problem is to find the length of the longest

*up sequence*in the list. A sequence of numbers is an "up sequence" if the numbers are in increasing order. Because the input is a file, and likely to be split up into multiple mappers, each mapper will need to know the displacement for its portion of the list. For example, one mapper will have displacement zero -- because it starts from the beginning of the list. Another mapper will have displacement 15205 because other mappers took care of input items at positions 0, 1, ..., 15204. The output should tell us the length of the longest upsequence.Google has announce a translation service, Google Translate. Unlike some previous approaches based on building an "understanding" of the concepts in some input text, Google's approach is more statistically derived from patterns in documents. To build a good statistical model, Google search a very large number of documents in many languages, tabulating what were the most common N-gram units found in the set of documents. They did this for N=1,2,3,4,5 (and maybe more, I don't know). For this exercise, suppose you have an input file of words and the goal is to find all N-grams with N<6 that occur more than 5,000 times, and output these N-grams.

An earlier exercise, part of the standard training for students of MapReduce, is to take a document and produce, for each word in the document, a list of places (such as byte offset withing the file) of where that word occurs. On a more practical scale for a search engine, the problem would be to make a list of URLs associated with each word. Furthermore, there can be some kind of "quality" metric on the URLs, so that the list of URLs could be ordered by quality, so that a search for a word would return the highest quality URLs first.

Suppose we have a solution to this problem of creating URL lists associated with each word, and that the words and associated lists are stored in a file system. Given any word, a program can just calculate a hash key for that word, and directly access the list of URLs. Suppose the list of URLs is very large, and broken down into multiple files, each ordered by quality: so actually, instead of one list, there are multiple lists. Can MapReduce be used to respond to a query looking for the highest quality URLs associated with a word?

Again, suppose we have the list of URLs associated with each word stored in a file system, but this time there is just one list (instead of multiple lists) associated with each word. Now consider a query for two words, for example, find the web pages containing both "red" and "boat". Can MapReduce be used to compute the result of the query? (The hope is to use massive parallelism to get the answer to the search.)

- One input to this problem is a very long character string, so long that it is wise to split it up and let multiple mapper tasks process the input in parallel. The other input to this problem is a regular expression. The output should be a list of all the strings within the input that match the regular expression.