Skip to content

Latest commit

 

History

History
132 lines (79 loc) · 20.2 KB

XX03.5-advanced-mapreduce.asciidoc

File metadata and controls

132 lines (79 loc) · 20.2 KB

Hadoop’s Contract

We will state very precisely what Hadoop guarantees, so that you can both attach a rigorous understanding to the haiku-level discussion and see how small the contract is. This formal understanding of the contract is very useful for reasoning about how Hadoop jobs work and perform.

Hadoop imposes a few seemingly-strict constraints and provides a very few number of guarantees in return. As you’re starting to see, that simplicity provides great power and is not as confining as it seems. You can gain direct control over things like partitioning, input splits and input/output formats. We’ll touch on a very few of those, but for the most part this book concentrates on using Hadoop from the outside — (REF) Hadoop: The Definitive Guide covers this stuff (definitively).

The Mapper’s Input Guarantee

The contract Hadoop presents for a map task is simple, because there isn’t much of one. Each mapper will get a continuous slice (or all) of some file, split at record boundaries, and in order within the file. You won’t get lines from another input file, no matter how short any file is; you won’t get partial records; and though you have no control over the processing order of chunks ("file splits"), within a file split all the records are in the same order as in the original file.

For a job with no reducer — a "mapper-only" job — you can then output anything you like; it is written straight to disk. For a Wukong job with a reducer, your output should be tab-delimited data, one record per line. You can designate the fields to use for the partition key, the sort key and the group key. (By default, the first field is used for all three.)

The typical job turns each input record into zero, one or many records in a predictable manner, but such decorum is not required by Hadoop. You can read in lines from Shakespeare and emit digits of pi; read in all input records, ignore them and emit nothing; or boot into an Atari 2600 emulator, publish the host and port and start playing Pac-Man. Less frivolously: you can accept URLs or filenames (local or HDFS) and emit their contents; accept a small number of simulation parameters and start a Monte Carlo simulation; or accept a database query, issue it against a datastore and emit each result.

The Reducer’s Input Guarantee

When Hadoop does the group/sort, it establishes the following guarantee for the data that arrives at the reducer:

  • each labelled record belongs to exactly one sorted group;

  • each group is processed by exactly one reducer;

  • groups are sorted lexically by the chosen group key;

  • and records are further sorted lexically by the chosen sort key.

It’s very important that you understand what that unlocks, so we’re going to redundantly spell it out a few different ways:

  • Each mapper-output record goes to exactly one reducer, solely determined by its key.

  • If several records have the same key, they will all go to the same reducer.

  • From the reducer’s perspective, if it sees any element of a group it will see all elements of the group.

You should typically think in terms of groups and not about the whole reduce set: imagine each partition is sent to its own reducer. It’s important to know, however, that each reducer typically sees multiple partitions. (Since it’s more efficient to process large batches, a certain number of reducer processes are started on each machine. This is in contrast to the mappers, who run one task per input split.) Unless you take special measures, the partitions are distributed arbitrarily among the reducers [1]. They are fed to the reducer in order by key.

Similar to a mapper-only task, your reducer can output anything you like, in any format you like. It’s typical to output structured records of the same or different shape, but you’re free engage in any of the shenanigans listed above.

Note
The traditional terms for the Hadoop phases are very unfortunately chosen. The name "map" isn’t that bad, though it sure gets confusing when you’re using a HashMap in the map phase of a job that maps locations to coordinates for a mapping application. Things get worse after that, though. Hadoop identifies two phases, called shuffle and sort, between the map and reduce. That division is irrelevant to you, the end user, and not even that essential internally. "Shuffling" is usually taken to mean "placing in random order", which is exactly not the case. And at every point of the intermediate phase, on both mapper and reducer, the data is being sorted (rather than only right at the end). This is horribly confusing, and we won’t use those terms. Instead, we will refer to a single intermediate phase called the "group-sort phase". Last and worst is the phrase "Reducer". There is no obligation on a reducer that it eliminate data, that its output be smaller in size or fewer in count than its input, that its output combine records from its input or even pay attention to them at all. Reducers quite commonly emit more data than they receive, and if you’re not careful explosively so. We’re stuck with the name "Map/Reduce", and so we’re also stuck calling this the "Reduce" phase, but put any concept of reduction out of your mind.

The Map Phase Processes Records Individually

The Map phase receives 0, 1 or many records individually, with no guarantees from Hadoop about their numbering, order or allocation. [2] Hadoop does guarantee that every record arrives in whole to exactly one Map task and that the job will only succeed if every record is processed without error.

The Mapper receives those records sequentially — it must fully process one before it receives the next — and can emit 0, 1 or many inputs of any shape or size. The chimpanzees working on the SantaCorp project received letters but dispatched toy forms. Julia’s thoughtful note produced two toy forms, one for her doll and one for Joe’s robot, while the spam letter produced no toy forms.

You can take this point to an arbitratry extreme. Now, the right way to bring in data from an external resource is by creating a custom loader or input format (see the chapter on Advanced Pig (REF)), which decouples loading data from processing data and allows Hadoop to intelligently manage tasks. There’s also a poor-man’s version of a custom loader, useful for one-offs, is to prepare a small number of file names, URLs, database queries or other external handles as input and emit the corresponding contents.

Please be aware, however, that it is only appropriate to access external resources from within a Hadoop job in exceptionally rare cases. Hadoop processes data in batches, which means failure of a single record results in the retry of the entire batch. It also means that when the remote resource is unavailable or responding sluggishly, Hadoop will spend several minutes and unacceptably many retries before abandoning the effort. Lastly, Hadoop is designed to drive every system resource at its disposal to its performance limit. [3]

For another extreme example, Hadoop’s 'distcp' utility, used to copy data from cluster to cluster, moves around a large amount of data yet has only a trivial input and trivial output. In a distcp job, each mapper’s input is a remote file to fetch; the action of the mapper is to write the file’s contents directly to the HDFS as a datanode client; and the mapper’s output is a summary of what was transferred.

While a haiku with only its first line is no longer a haiku, a Hadoop job with only a Mapper is a perfectly acceptable Hadoop job, as you saw in the Pig Latin translation example. In such cases, each Map Task’s output is written directly to the HDFS, one file per Map Task, as you’ve seen. Such jobs are only suitable, however, for so-called "embarrassingly parallel problems" — where each record can be processed on its own with no additional context.

The Map stage in a Map/Reduce job has a few extra details. It is responsible for labeling the processed records for assembly into context groups. Hadoop files each record into the equivalent of the pigmy elephants' file folders: an in-memory buffer holding each record in sorted order. There are two additional wrinkles, however, beyond what the pigmy elephants provide. First, the Combiner feature lets you optimize certain special cases by preprocessing partial context groups on the Map side; we will describe these more in a later chapter (REF). Second, if the sort buffer reaches or exceeds a total count or size threshold, its contents are "spilled" to disk and subsequently merge-sorted to produce the Mapper’s proper output.

The Hadoop Contract

Here in one place is a casually rigorous summation of the very few guarantees Hadoop provides your Map/Reduce program. Understanding these is a critical tool for helping you to create and reason about Hadoop workflows.

  • Each record is processed in whole by exactly one Mapper.

  • Each Mapper receives records from exactly one contiguous split of input data, in the same order as those records appear in the source.

  • There are no guarantees on how long a split is, how many there are, the order in which they are processed or the assignment of split to Mapper slot.

  • In both Mapper and Reducer, there is no requirement on you to use any of the structure described here or even to use the records' contents at all. You do not have to do anything special when a partition or group begins or ends and your program can emit as much or as little data as you like before, during or after processing its input stream.

  • In a Mapper-only job, each Mapper’s output is placed in exactly one uniquely-named, immutable output file in the order the records were emitted. There are no further relevant guarantees for a Mapper-Only job.

  • Each Mapper output record is processed in whole by exactly one Reducer.

  • Your program must provide each output record with a label consisting of a partition key, group key and sort key; these expressly govern how Hadoop assigns records to Reducers.

  • All records sharing a partition key are sent to the same Reducer; if a Reducer sees one record from a partition, it will see all records from that partition, and no other Reducer will see any record from that partition.

  • Partitions are sent contiguously to the Reducer; if a Reducer receives one record from a partition, it will receive all of them in a stretch, and will never again see a record from a prior partition.

  • Partitions themselves are ordered by partition key within the Reducer input.

  • A custom partitioner can assign each partition to specific Reducer, but you should not depend on any pairing provided by the default partitioner (the HashPartitioner) .

  • Within each partition, records are sent within contiguous groups; if a Reducer receives one record from a group, it will receive all of them in a stretch, and will never again see a record from a prior group.

  • Within a partition, records are sorted first by the group key, then by the sort key; this means groups themselves are ordered by group key within the Reducer input. (TECHREVIEW: Check that this is consistent with the Java API and the Pig UDF API.)

  • Each Reducer’s output is placed in exactly one uniquely-named, immutable output file in the order the records were emitted.

You can tell how important we feel it is for you to internalize this list of guarantees, or we would not have gotten all, like, formal and stuff.

How Hadoop Manages Midstream Data

The first part of this chapter (REF) described the basics of what Hadoop supplies to a Reducer: each record is sent to exactly one reducer; all records with a given label are sent to the same Reducer; and all records for a label are delivered in a continuous ordered group. Let’s understand the remarkably economical motion of data Hadoop uses to accomplish this.

Mappers Spill Data In Sorted Chunks

As your Map task produces each labeled record, Hadoop inserts it into a memory buffer according to its order. Like the dextrous chimpanzee, the current performance of CPU and memory means this initial ordering imposes negligible overhead compared to the rate that data can be read and processed. When the Map task concludes or that memory buffer fills, its contents are flushed as a stream to disk. The typical Map task operates on a single HDFS block and produces an output size not much larger. A well-configured Hadoop cluster sets the sort buffer size accordingly [4].

If there are multiple spills, Hadoop performs the additional action of merge-sorting the chunks into a single spill. [5]

As you know, each record is sent to exactly one Reducer. The label for each record actually consists of two important parts: the partition key that determines which Reducer the record belongs to, and the sort key, which groups and orders those records within the Reducer’s input stream. You will notice that, in the programs we have written, we only had to supply the record’s natural label and never had to designate a specific Reducer; Hadoop handles this for you by applying a partitioner to the key.

Partitioners Assign Each Record To A Reducer By Label

The default partitioner, which we find meets almost all our needs, is called the "RandomPartitioner." [6] It aims to distribute records uniformly across the Reducers by giving each key the same chance to land on any given Reducer. It is not really random in the sense of nondeterministic; running the same job with the same configuration will distribute records the same way. Rather, it achieves a uniform distribution of keys by generating a cryptographic digest — a number produced from the key with the property that any change to that key would instead produce an arbitrarily distinct number. Since the numbers thus produced have high and uniform distribution, the digest MODULO the number of Reducers reliably balances the Reducer’s keys, no matter their raw shape and size. [7]

Note
The default partitioner aims to provide a balanced distribution of keys — which does not at all guarantee a uniform distribution of records ! If 40-percent of your friends have the last name Chimpanzee and 40-percent have the last name Elephant, running a Map/Reduce job on your address book, partitioned by last name, will send all the Chimpanzees to some Reducer and all the Elephants to some Reducer (and if you are unlucky, possibly even the same one). Those unlucky Reducers will struggle to process 80-percent of the data while the remaining Reducers race through their unfairly-small share of what is left. This situation is far more common and far more difficult to avoid than you might think, so large parts of this book’s intermediate chapters are, in effect, tricks to avoid that situation.

Reducers Receive Sorted Chunks From Mappers

Partway through your job’s execution, you will notice its Reducers spring to life. Before each Map task concludes, it streams its final merged spill over the network to the appropriate Reducers [8]. Just as above, the Reducers file each record into a sort buffer, spills that buffer to disk as it fills and begins merge/sorting them once a threshold of spills is reached.

Whereas the numerous Map tasks typically skate by with a single spill to disk, you are best off running a number of Reducers, the same as or smaller than the available slots. This generally leads to a much larger amount of data per Reducer and, thus, multiple spills.

Reducers Read Records With A Final Merge/Sort Pass

The Reducers do not need to merge all records to a single unified spill. The elves at each workbench pull directly from the limited number of parts carts as they work' similarly, once the number of mergeable spills is small enough, the Reducer begins processing records from those spills directly, each time choosing the next in sorted order.

Your program’s Reducer receives the records from each group in sorted order, outputting records as it goes. Your reducer can output as few or as many records as you like at any time: on the start or end of its run, on any record, or on the start or end of a group. It is not uncommon for a job to produce output the same size as or larger than its input — "Reducer" is a fairly poor choice of names. Those output records can also be of any size, shape or format; they do not have to resemble the input records, and they do not even have to be amenable to further Map/Reduce processing.

Reducers Write Output Data and Commit

As your Reducers emit records, they are streamed directly to the job output, typically the HDFS or S3. Since this occurs in parallel with reading and processing the data, the primary spill to the Datanode typically carries minimal added overhead.

You may wish to send your job’s output not to the HDFS or S3 but to a scalable database or other external data store. (We’ll show an example of this in the chapter on HBase (REF)) While your job is in development, though, it is typically best to write its output directly to the HDFS (perhaps at replication factor 1), then transfer it to the external target in a separate stage. The HDFS is generally the most efficient output target and the least likely to struggle under load. This checkpointing also encourages the best practice of sanity-checking your output and asking questions.

A Quick Note on Storage (HDFS)

If you’re a Hadoop administrator responsible for cluster setup and maintenance, you’ll want to know a lot about Hadoop’s underlying storage mechanism, called HDFS. As an analyst who writes jobs to run on a Hadoop cluster, though, you need to know just one key fact:

HDFS likes big files.

Put another way, HDFS doesn’t like small files, and "small" is "anything that weighs less than 64 megabytes." If you’re interested in the technical specifics, you can check out the blog post on "The Small Files Problem" [9]. Really, you just want to know that small files will really gum up the works.

This often leads people to ask: "How do I use Hadoop on, say, image analysis? I want to a large number of images that are only a few kilobytes in size." For that, check out a Hadoop storage format called a SequenceFile. [10]


1. Using a "consistent hash"; see (REF) the chapter on Statistics
2. In special cases, you may know that your input bears additional guarantees — for example, the "Merge Join" described in Chapter (REF) requires its inputs to be in total sorted order. It is on you, however, to enforce and leverage those special properties.
3. We will drive this point home in the chapter on Event Log Processing (REF), where we will stress test a web server to its performance limit by replaying its request logs at full speed.
4. The chapter on Hadoop Tuning For The Brave And Foolish (REF) shows you how); that most common case produces only a single spill.
5. This can be somewhat expensive, so in Chapter (REF), we will show you how to avoid unnecessary spills.) Whereas the pygmy elephants each belonged to a distinct workbench, a Hadoop Mapper produces only that one unified spill. That’s ok — it is easy enough for Hadoop to direct the records as each is sent to its Reducer.
6. In the next chapter (REF), you will meet another partitioner, when you learn how to do a total sort.
7. If you will recall, x MODULO y gives the remainder after dividing x and y. You can picture it as a clock with y hours on it: 15 MODULO 12 is 3; 4 MODULO 12 is 4; 12 MODULO 12 is 0.
8. NOTE: Note that this communication is direct; it does not use the HDFS.
10. Also, Q wrote a handy tool to wrap up your small files into big SequenceFiles. Check out forqlift at http://qethanm.cc/projects/forqlift/