Skip to content

Latest commit

 

History

History
254 lines (153 loc) · 36 KB

XX53-hadoop_internals.asciidoc

File metadata and controls

254 lines (153 loc) · 36 KB

Hadoop Internals

For 16 chapters now, we’ve been using Hadoop and Storm+Trident from the outside. The biggest key to writing efficient dataflows is to understand the interface and fundamental patterns of use, not the particulars of how these framework executes the dataflow. However, even the strongest abstractions pushed far enough can leak, and so at some point, it’s important to understand these internal details. These next few chapters concentrate on equipping you to understand your jobs’ performance and practical tips for improving it; if you’re looking for more, by far, the best coverage of this material is found in (TODO: Add links Tom White’s Hadoop: The Definitive Guide and Eric Sammer’s Hadoop Operations).

Let’s first focus on the internals of Hadoop.

HDFS (NameNode and DataNode)

It’s time to learn how the HDFS stores your data; how the Map/Reduce framework launches and coordinates job attempts; and what happens within the framework as your Map/Reduce process executes.

The HDFS provides three remarkable guarantees: durability (your data is never lost or corrupted), availability (you can always access it) and efficiency (you can consume the data at high rate especially from Map/Reduce jobs and other clients). The center of action, as you might guess, is the NameNode. The NameNode is a permanently running daemon process that tracks the location of every block on the network of DataNodes, along with its name and other essential metadata. (If you’re familiar with the File Allocation Table (FAT) of a traditional file system, it’s like a more active version of that. FOOTNOTE: [If you’re not familiar with what an FAT is, then it’s like the system you’re reading about but for a file system.])

(NOTE: check something … appending)

When a client wishes to create a file, it contacts the NameNode with the desired path and high-level metadata. The NameNode records that information in an internal table and identifies the DataNodes that will hold the data. The NameNode then replies with that set of DataNodes, identifying one of them as the initial point of contact. (When we say "client", that’s anything accessing the NameNode, whether it’s a Map/Reduce job, one of the Hadoop filesystem commands or any other program.) The file is now exclusively available to the client for writing but will remain invisible to anybody else until the write has concluded (TODO: Is it when the first block completes or when the initial write completes?).

Within the client’s request, it may independently prescribe a replication factor, file permissions, and block size _ (TODO: fill in)

The client now connects to the indicated DataNode and begins sending data. At the point you’ve written a full block’s worth of data, the DataNode transparently finalizes that block and begins another (TODO: check that it’s the DataNode that does this). As it finishes each block or at the end of the file, it independently prepares a checksum of that block, radioing it back to the NameNode and begins replicating its contents to the other DataNodes. (TODO: Is there an essential endoffile ritual?) This is all transparent to the client, who is able to send data as fast as it can cram it through the network pipe.

Once you’ve created a file, its blocks are immutable — as opposed to a traditional file system, there is no mechanism for modifying its internal contents. This is not a limitation; it’s a feature. Making the file system immutable not only radically simplifies its implementation, it makes the system more predictable operationally and simplifies client access. For example, you can have multiple jobs and clients access the same file knowing that a client in California hasn’t modified a block being read in Tokyo (or even worse, simultaneously modified by someone in Berlin). (TODO: When does append become a thing?)

The end of the file means the end of its data but not the end of the story. At all times, the DataNode periodically reads a subset of its blocks to find their checksums and sends a "heartbeat" back to the DataNode with the (hopefully) happy news. __ (TODO: fill in). There are several reasons a NameNode will begin replicating a block. If a DataNode’s heartbeat reports an incorrect block checksum, the NameNode will remove that DataNode from the list of replica holders for that block, triggering its replication from one of the remaining DataNodes from that block. If the NameNode has not received a heartbeat from a given DataNode within the configured timeout, it will begin replicating all of that DataNode’s blocks; if that DataNode comes back online, the NameNode calmly welcomes it back into the cluster, cancelling replication of the valid blocks that DataNode holds. Furthermore, if the amount of data on the most populated and least populated DataNodes becomes larger than a certain threshold or the replication factor for a file is increased, it will rebalance; you can optionally trigger one earlier using the hadoop balancer command.

However it’s triggered, there is no real magic; one of the valid replica-holder DataNodes sends the block contents to the new replica holder, which heartbeats back the block once received. (TODO: Check details)

As you can see, the NameNode and its metadata are at the heart of every HDFS story. This is why the new HighAvailability (HA) NameNode feature in recent versions is so important and should be used in any production installation. It’s even more important, as well, to protect and backup the NameNode’s metadata, which, unfortunately, many people don’t know to do. (TODO: Insert notes on NameNode metadata hygiene previously written).

The NameNode selects the recipient DataNodes with some intelligence. Information travels faster among machines on the same switch, switches within the same data center and so forth. However, if a switch fails, all its machines are unavailable and if a data center fails, all its switches are unavailable and so forth. When you configure Hadoop, there is a setting that allows you to tell it about your network hierarchy. If you do so, the NameNode will ensure the first replica lies within the most distant part of the hierarchy — durability is important above all else. All further replicas will be stored within the same rack — providing the most efficient replication. (TODO: Mention the phrase "rack aware.") As permitted within that scheme, it also tries to ensure that the cluster is balanced — preferring DataNodes with less data under management. (TODO: Check that it’s amount of data not percent free)

(TODO: Does it make contact on every block or at the start of a file?)

The most important feature of the HDFS is that it is highly resilient against failure of its underlying components. Any system achieves resiliency through four mechanisms: Act to prevent failures; insulate against failure through redundancy; isolate failure within independent fault zones; and lastly, detect and remediate failures rapidly. (FOOTNOTE: This list is due to James Hamilton, TODO: link whose blocks and papers are essential reading). The HDFS is largely insulated from failure by using file system-based access (it does not go behind the back of the operating system), by being open source (ensuring code is reviewed by thousands of eyes and run at extremely high scale by thousands of users), and so forth. Failure above the hardware level is virtually unheard of. The redundancy is provided by replicating the data across multiple DataNodes and, in recent versions, using the Zookeeper-backed HighAvailability NameNode implementation.

The rack awareness, described above, isolates failure using your defined network hierarchy, and at the semantic level, independently for each HDFS block. Lastly, the heartbeat and checksum mechanisms along with its active replication and monitoring hooks allow it and its Operators to detect intermediate faults.

S3 File System

The Amazon EC2 Cloud provides an important alternative to the HDFS, its S3 object store. S3 transparently provides multi-region replication far exceeding even HDFS’ at exceptionally low cost (at time of writing, about $80 per terabyte per month, and decreasing at petabyte and higher scale). What’s more, its archival datastore solution, Glacier, will hold rarely-accessed data at one-tenth that price and even higher durability. (FOOTNOTE: The quoted durability figure puts the engineering risk below, say, the risk of violent overthrow of our government). For machines in the Amazon Cloud with a provisioned connection, the throughput to and from S3 is quite acceptable for Map/Reduce use.

Hadoop has a built-in facade for the S3 file system, and you can do more or less all the things you do with an HDFS: list, put and get files; run Map/Reduce jobs to and from any combination of HDFS and S3; read and create files transparently using Hadoop’s standard file system API. There are actually two facades. The s3hdfs facade (confusingly labeled as plain s3 by Hadoop but we will refer to it here as s3hdfs) stores blocks in individual files using the same checksum format as on a DataNode and stores the Name Node-like metadata separately in a reserved area. The s3n facade, instead, stores a file as it appears to the Hadoop client, entirely in an s3 object with a corresponding path. When you visit S3 using Amazon’s console or any other standard S3 client, you’ll see a file called /my/data.txt as an object called datadoc.txt in MyContainer and its contents are immediately available to any such client; that file, written s3hdfs will appear in objects named for 64-bit identifiers like 0DA37f…​ and with uninterpretable contents. However, s3n cannot store an individual file larger than 5 terabytes. The s3hdfs blocks minorly improve Map/Reduce efficiency and can store files of arbitrary size. All in all, we prefer the s3n facade; the efficiency improvement for the robots does not make up for the impact on convenience on the humans using the system and that it’s a best-practice to not make individual files larger than 1 terabyte any way.

The universal availability of client libraries makes S3 a great hand-off point to other systems or other people looking to use the data. We typically use a combination of S3, HDFS and Glacier in practice. Gold data — anything one project produces that another might use — is stored on S3. In production runs, jobs read their initial data from S3 and write their final data to S3 but use an HDFS local to all its compute nodes for any intermediate checkpoints.

When developing a job, we run an initial distcp from S3 onto the HDFS and do all further development using the cluster-local HDFS. The cluster-local HDFS provides better (but not earth-shakingly better) Map/Reduce performance. It is, however, noticeably faster in interactive use (file system commands, launching jobs, etc). Applying the "robots are cheap, humans are important" rule easily justifies the maintenance of the cluster-local HDFS.

If you use a cluster-local HDFS in the way described, that is, it holds no gold data, only development and checkpoint artifacts, _ (TODO: fill in). Provision your HDFS to use EBS volumes, not the local (ephemeral) ones. EBS volumes surprisingly offer the same or better throughput as local ones and allow you to snapshot a volume in use, or even kill all the compute instances attached to those volumes then reattach them to a later incarnation of the cluster. (FOOTNOTE: This does require careful coordination. Our open-source Iron-Fan framework has all the code required to do so.) Since the EBS volumes have significant internal redundancy, it then becomes safe to run a replication factor of 2 or even 1. For many jobs, the portion of the commit stage waiting for all DataNodes to acknowledge replication can become a sizable portion of the time it takes a Map/Reduce stage to complete. Do this only if you’re an amateur with low stakes or a professional whose colleagues embrace these tradeoffs; nobody ever got fired for using a replication factor of 3.

As your S3 usage grows --- certainly if you find you have more than, say, a dozen terabytes of data not in monthly use — it’s worth marking that data for storage in Glacier, not S3 (you can only do this, of course, if you’re using the s3n facade). There’s a charge for migrating data and, of course, your time is valuable, but the savings can be enormous.

Chimpanzee and Elephant: A Day at Work

Each day, the chimpanzee’s foreman, a gruff silverback named J.T., hands each chimp the day’s translation manual and a passage to translate as they clock in. Throughout the day, he also coordinates assigning each block of pages to chimps as they signal the need for a fresh assignment.

Some passages are harder than others, so it’s important that any elephant can deliver page blocks to any chimpanzee — otherwise you’d have some chimps goofing off while others are stuck translating King Lear into Kinyarwanda. On the other hand, sending page blocks around arbitrarily will clog the hallways and exhaust the elephants.

The elephants' chief librarian, Nanette, employs several tricks to avoid this congestion.

Since each chimpanzee typically shares a cubicle with an elephant, it’s most convenient to hand a new page block across the desk rather then carry it down the hall. J.T. assigns tasks accordingly, using a manifest of page blocks he requests from Nanette. Together, they’re able to make most tasks be "local".

Second, the page blocks of each play are distributed all around the office, not stored in one book together. One elephant might have pages from Act I of Hamlet, Act II of The Tempest, and the first four scenes of Troilus and Cressida [1]. Also, there are multiple 'replicas' (typically three) of each book collectively on hand. So even if a chimp falls behind, JT can depend that some other colleague will have a cubicle-local replica. (There’s another benefit to having multiple copies: it ensures there’s always a copy available. If one elephant is absent for the day, leaving her desk locked, Nanette will direct someone to make a xerox copy from either of the two other replicas.)

Nanette and J.T. exercise a bunch more savvy optimizations (like handing out the longest passages first, or having folks who finish early pitch in so everyone can go home at the same time, and more). There’s no better demonstration of power through simplicity.

Brief Anatomy of a Hadoop Job

We’ll go into much more detail in (TODO: ref), but here are the essentials of what you just performed.

Copying files to the HDFS

When you ran the hadoop fs -mkdir command, the Namenode (Nanette’s Hadoop counterpart) simply made a notation in its directory: no data was stored. If you’re familiar with the term, think of the namenode as a 'File Allocation Table (FAT)' for the HDFS.

When you run hadoop fs -put …​, the putter process does the following for each file:

  1. Contacts the namenode to create the file. This also just makes a note of the file; the namenode doesn’t ever have actual data pass through it.

  2. Instead, the putter process asks the namenode to allocate a new data block. The namenode designates a set of datanodes (typically three), along with a permanently-unique block ID.

  3. The putter process transfers the file over the network to the first data node in the set; that datanode transfers its contents to the next one, and so forth. The putter doesn’t consider its job done until a full set of replicas have acknowledged successful receipt.

  4. As soon as each HDFS block fills, even if it is mid-record, it is closed; steps 2 and 3 are repeated for the next block.

Running on the cluster

Now let’s look at what happens when you run your job.

(TODO: verify this is true in detail. @esammer?)

  • Runner: The program you launch sends the job and its assets (code files, etc) to the jobtracker. The jobtracker hands a job_id back (something like job_201204010203_0002 — the datetime the jobtracker started and the count of jobs launched so far); you’ll use this to monitor and if necessary kill the job.

  • Jobtracker: As tasktrackers "heartbeat" in, the jobtracker hands them a set of 'task’s — the code to run and the data segment to process (the "split", typically an HDFS block).

  • Tasktracker: each tasktracker launches a set of 'mapper child processes', each one an 'attempt' of the tasks it received. (TODO verify:) It periodically reassures the jobtracker with progress and in-app metrics.

  • Jobtracker: the Jobtracker continually updates the job progress and app metrics. As each tasktracker reports a complete attempt, it receives a new one from the jobtracker.

  • Tasktracker: after some progress, the tasktrackers also fire off a set of reducer attempts, similar to the mapper step.

  • Runner: stays alive, reporting progress, for the full duration of the job. As soon as the job_id is delivered, though, the Hadoop job itself doesn’t depend on the runner — even if you stop the process or disconnect your terminal the job will continue to run.

Warning

Please keep in mind that the tasktracker does not run your code directly — it forks a separate process in a separate JVM with its own memory demands. The tasktracker rarely needs more than a few hundred megabytes of heap, and you should not see it consuming significant I/O or CPU.

Chimpanzee and Elephant: Splits

I’ve danced around a minor but important detail that the workers take care of. For the Chimpanzees, books are chopped up into set numbers of pages — but the chimps translate sentences, not pages, and a page block boundary might happen mid-sentence. //// Provide a real world analogous example here to help readers correlate this story to their world and data analysis needs, "…​This example is similar to…​" Amy////

The Hadoop equivalent of course is that a data record may cross and HDFS block boundary. (In fact, you can force map-reduce splits to happen anywhere in the file, but the default and typically most-efficient choice is to split at HDFS blocks.)

A mapper will skip the first record of a split if it’s partial and carry on from there. Since there are many records in each split, that’s no big deal. When it gets to the end of the split, the task doesn’t stop processing until is completes the current record — the framework makes the overhanging data seamlessley appear. //// Again, here, correlate this example to a real world scenario; "…​so if you were translating x, this means that…​" Amy////

In practice, Hadoop users only need to worry about record splitting when writing a custom InputFormat or when practicing advanced magick. You’ll see lots of reference to it though — it’s a crucial subject for those inside the framework, but for regular users the story I just told is more than enough detail.

The HDFS: Highly Durable Storage Optimized for Analytics

The HDFS, as we hope you’ve guessed, holds the same role within Hadoop that Nanette and her team of elephants do within C&E Corp. It ensures that your data is always available for use, never lost or degraded and organized to support efficient Map/Reduce jobs. Files are stored on the HDFS as blocks of limited size (128 MB is a common choice). Each block belongs to exactly one file; a file larger than the block size is stored in multiple blocks. The blocks are stored in cooked form as regular files on one of the Datanode’s regular volumes. (Hadoop’s decision to use regular files rather than attempting lower-level access to the disk, as many traditional databases do, helps make it remarkably portable, promotes reliability and plays to the strengths of the operating system’s finely-tuned access mechanisms.)

The HDFS typically stores multiple replicas of each block (three is the universal default, although you can adjust it per file), distributed across the cluster. Blocks within the same file may or may not share a Datanode but replicas never do (or they would not be replicas, would they?). The obvious reason for this replication is availability and durability — you can depend on finding a live Datanode for any block and you can depend that, if a Datanode goes down, a fresh replica can be readily produced.

JT and Nanette’s workflow illustrates the second benefit of replication: being able to “move the compute to the data, not [expensively] moving the data to the compute.” Multiple replicas give the Job Tracker enough options that it can dependably assign most tasks to be “Mapper-local.”

A special node called the Namenode is responsible for distributing those blocks of data across the cluster. Like Nanette, the Namenode holds no data, only a sort of file allocation table (FAT), tracking for every file the checksum responsible Datanodes and other essential characteristics of each of its blocks. The Namenode depends on the Datanodes to report in regularly. Every three seconds, it sends a heartbeat — a lightweight notification saying, basically, "I’m still here!". On a longer timescale, each Datanode prepares a listing of the replicas it sees on disk along with a full checksum of each replica’s contents. Having the Datanode contact the Namenode is a good safeguard that it is operating regularly and with good connectivity. Conversely, the Namenode uses the heartbeat response as its opportunity to issue commands dening a struggling Datanode.

If, at any point, the Namenode finds a Datanode has not sent a heartbeat for several minutes, or if a block report shows missing or corrupted files, it will commission new copies of the affected blocks by issuing replication commands to other Datanodes as they heartbeat in.

A final prominent role the Namenode serves is to act as the public face of the HDFS. The ‘put’ and ‘get’ commands you just ran were Java programs that made network calls to the Namenode. There are API methods for the rest of the file system commands you would expect for use by that or any other low-level native client. You can also access its web interface, typically by visiting port 50070 (http://hostname.of.namenode:50070), which gives you the crude but effective ability to view its capacity, operational status and, for the very patient, inspect the contents of the HDFS.

Sitting behind the scenes is the often-misunderstood secondary Namenode; this is not, as its name implies and as you might hope, a hot standby for the Namenode. Unless you are using the “HA namenode” feature provided in later versions of Hadoop, if your Namenode goes down, your HDFS has gone down. All the secondary Namenode does is perform some essential internal bookkeeping. Apart from ensuring that it, like your Namenode, is always running happily and healthily, you do not need to know anything more about the second Namenode for now.

One last essential to note about the HDFS is that its contents are immutable. On a regular file system, every time you hit “save,” the application modifies the file in place — on Hadoop, no such thing is permitted. This is driven by the necessities of distributed computing at high scale but it is also the right thing to do. Data analysis should proceed by chaining reproducible syntheses of new beliefs from input data. If the actions you are applying change, so should the output. This casual consumption of hard drive resources can seem disturbing to those used to working within the constraints of a single machine, but the economics of data storage are clear; it costs $0.10 per GB per month at current commodity prices, or one-tenth that for archival storage, and at least $50 an hour for the analysts who will use it.

Possibly the biggest rookie mistake made by those new to Big Data is a tendency to economize on the amount of data they store; we will try to help you break that habit. You should be far more concerned with the amount of data you send over the network or to your CPU than with the amount of data you store and most of all, with the amount of time you spend deriving insight rather than acting on it. Checkpoint often, denormalize when reasonable and preserve the full provenance of your results.

JT and Nanette at Work

JT and Nanette work wonderfully together — JT rambunctiously barking orders, Nanette peacefully gardening her card catalog — and subtly improve the efficiency of their team in a variety of ways. We’ll look closely at their bag of tricks later in the book (TODO ref) but here are two. The most striking thing any visitor to the worksite will notice is how calm everything is. One reason for this is Nanette’s filing scheme, which designates each book passage to be stored by multiple elephants. Nanette quietly advises JT of each passage’s location, allowing him to almost always assign his chimpanzees a passage held by the librarian in their cubicle. In turn, when an elephant receives a freshly-translated scroll, she makes two photocopies and dispatches them to two other cubicles. The hallways contain a stately parade of pygmy elephants, each carrying an efficient load; the only traffic consists of photocopied scrolls to store and the occasional non-cubicle-local assignment.

The other source of calm is on the part of their clients, who know that when Nanette’s on the job, their archives are safe — the words of Shakespeare will retain their eternal form [2] To ensure that no passage is never lost, the librarians on Nanette’s team send regular reports on the scrolls they maintain. If ever an elephant doesn’t report in (whether it stepped out for an hour or left permanently), Nanette identifies the scrolls designated for that elephant and commissions the various librarians who hold other replicas of that scroll to make and dispatch fresh copies. Each scroll also bears a check of authenticity validating that photocopying, transferring its contents or even mouldering on the shelf has caused no loss of fidelity. Her librarians regularly recalculate those checks and include them in their reports, so if even a single letter on a scroll has been altered, Nanette can commission a new replica at once.

SIDEBAR: What’s Fast At High Scale

Throughput and Cost for Compute Primitives — the "Numbers Every Programmer Should Know"
Cost to Host and Serve One Billion 1kB Records (1 TB)

The table at the right (REF) summarizes the 2013 values for Peter Norvig’s "Numbers Every Programmer Should Know."  — the length of time for each computation primitive on modern hardware. We’ve listed the figures several different ways: as latency (time to execute); as the number of 500-byte records that could be processed in an hour (TODO: day), if that operation were the performance bottleneck of your process; and as an amount of money to process one billion records of 500-byte each on commodity hardware. Big Data requires high volume, high throughput computing, so our principle bound is the speed at which data can be read from and stored to disk. What is remarkable is that with the current state of technology, most of the other operations are slammed to one limit or the other: either bountifully unconstraining or devastatingly slow. That lets us write down the following "rules for performance at scale:"

  • High throughput programs cannot run faster than x (TODO: Insert number)

  • Data can be streamed to and from disk at x GB per hour (x records per hour, y records per hour, z dollars per billion records) (TODO: insert numbers)

  • High throughput programs cannot run faster than that but not run an order of magnitude slower.

  • Data streams over the network at the same rate as disk.

  • Memory access is infinitely fast.

  • CPU is fast enough to not worry about except in the obvious cases where it is not.

  • Random access (seeking to individual records) on disk is unacceptably slow.

  • Network requests for data (anything involving a round trip) is infinitely slow.

  • Disk capacity is free.

  • CPU and network transfer costs are cheap.

  • Memory is expensive and cruelly finite. For most tasks, available memory is either all of your concern or none of your concern.

Now that you know how Hadoop moves data around, you can use these rules to explain its remarkable scalability.

  1. Mapper streams data from disk and spills it back to disk; cannot go faster than that.

  2. In between, your code processes the data

  3. If your unwinding proteins or multiplying matrices are otherwise CPU or memory bound, Hadoop at least will not get in your way; the typical Hadoop job can process records as fast as they are streamed.

  4. Spilled records are sent over the network and spilled back to disk; again, cannot go faster than that.

That leaves the big cost of most Hadoop jobs: the midstream merge-sort. Spilled blocks are merged in several passes (at the Reducer and sometimes at the Mapper) as follows. Hadoop begins streaming data from each of the spills in parallel. Under the covers, what this means is that the OS is handing off the contents of each spill as blocks of memory in sequence. It is able to bring all its cleverness to bear, scheduling disk access to keep the streams continually fed as rapidly as each is consumed.

Hadoop’s actions are fairly straightforward. Since the spills are each individually sorted, at every moment the next (lowest ordered) record to emit is guaranteed to be the next unread record from one of its streams. It continues in this way, eventually merging each of its inputs into an unbroken output stream to disk. At no point does the Hadoop framework require a significant number of seeks on disk or requests over the network; the memory requirements (the number of parallel streams times the buffer size per stream) are manageable; and the CPU burden is effectively nil, so the merge/sort as well runs at the speed of streaming to disk.

Hadoop Output phase may be more expensive than you think

As your Reducers emit records, they are streamed directly to the job output, typically the HDFS or S3. Since this occurs in parallel with reading and processing the data, the primary spill to the Datanode typically carries minimal added overhead. However, the data is simultaneously being replicated as well, which can extend your job’s runtime by more than you might think.

TODO-qem — a) does this section belong in "06a jsut enough performance for now" or here? b) I think this is a really good point to hit so want it to be really clear; apply extra criticism here.

Let’s consider how data flows in a job intended to remove duplicate records: for example, processing 100 GB of data with one-percent duplicates, and writing output with replication factor three. As you’ll see when we describe the 'distinct' patterns in Chapter 5 (REF), the Reducer input is about the same size as the mapper input. Using what you now know, Hadoop moves roughly the following amount of data, largely in parallel:

  • 100 GB of Mapper input read from disk;

  • 100 GB spilled back to disk;

  • 100 GB of Reducer input sent and received over the network;

  • 100 GB of Reducer input spilled to disk

  • some amount of data merge/sorted to disk if your cluster size requires multiple passes;

  • 100 GB of Reducer output written to disk by the local Datanode;

  • 200 GB of replicated output sent over the network, received over the network and written to disk by the Datanode.

If your Datanode is backed by remote volumes (common in some virtual environments [3]), you’ll additionally incur

  • 300 GB sent over the network to the remote file store

As you can see, unless your cluster is undersized (producing significant merge/sort overhead), the cost of replicating the data rivals the cost of the rest of the job. The default replication factor is 3 for two very good reasons: it helps guarantee the permanence of your data and it allows the Job tracker to efficiently allocate Mapper-local tasks. But in certain cases — intermediate checkpoint data, scratch data or where backed by a remote file system with its own durability guarantee — an expert who appreciates the risk can choose to reduce the replication factor to 2 or 1.

Humans are important, robots are cheap.

To store 10 Billion records with an average size of 1 kB — that’s 10 TB — it costs

  • $200,000 /month to store it all on ram ($1315/mo for 150 68.4 GB machines)

  • $ 20,000 /month to have it 10% backed by ram ($1315/mo for 15 68.4 GB machines)

  • $ 1,000 /month to store it on disk (EBS volumes)

Compare witih

  • $ 1,600 /month salary of a part-time intern

  • $ 5,500 /month salary of a full-time junior engineer

  • $ 10,000 /month salary of a full-time senior engineer

For a 10-hour working day,

  • $ 270 /day for a 30-machine cluster having a total of 1TB ram, 120 cores

  • $ 650 /day for that same cluster if it runs for the full 24-hour day

  • $ 64 /day for a 10-machine cluster having a total of 150 GB ram, 40 cores

  • $ 180 /day for an intern to operate it

  • $ 300 /day for a junior engineer to operate it

  • $ 600 /day for a senior engineer to operate it

Suppose you have a job that runs daily, taking 3 hours on a 10-machine cluster of 15 GB machines. That job costs you $600/month.

If you tasked a junior engineer to spend three days optimizing that job, with a 10-machine cluster running the whole time, it would cost you about $1100. If she made the job run three times faster — so it ran in 1 hour instead of 3 — the job would now cost about $200. However, it will take three months to reach break-even.

As a rule of thumb,

Size your cluster so that it is either almost-always-idle or healthily exceeds the opportunity cost to the humans working on it.

Takeaways:

  • Engineers are more expensive than compute.

  • Use elastic clusters for your data science team

Hadoop Job Execution Internals

  • Hadoop Job Execution

  • Lifecycle of a job at the client level including figuring out where all the source data is; figuring out how to split it; sending the code to the JobTracker, then tracking it to completion.

  • How the JobTracker and TaskTracker cooperate to run your job, including: The distinction between Job, Task and Attempt., how each TaskTracker obtains its Attempts, and dispatches progress and metrics back to the JobTracker, how Attempts are scheduled, including what happens when an Attempt fails and speculative execution, __, Split.

  • How TaskTracker child and Datanode cooperate to execute an Attempt, including; what a child process is, making clear the distinction between TaskTracker and child process.

  • Briefly, how the Hadoop Streaming child process works.

Map-Reduce Internals

  • How the mapper and Datanode handle record splitting and how and when the partial records are dispatched.

  • The mapper sort buffer and spilling to disk (maybe here or maybe later, the I/O.record.percent).

  • Briefly note that data is not sent from mapper-to-reducer using HDFS and so you should pay attention to where you put the Map-Reduce scratch space and how stupid it is about handling an overflow volume.

  • Briefly, that combiners are a thing.

  • Briefly, how records are partitioned to reducers and that custom partitioners are a thing.

  • How the Reducer accepts and tracks its mapper outputs.

  • Details of the merge/sort (shuffle and sort), including the relevant buffers and flush policies and why it can skip the last merge phase.

  • (NOTE: Secondary sort and so forth will have been described earlier.)

  • Delivery of output data to the HDFS and commit whether from mapper or reducer.

  • Highlight the fragmentation problem with map-only jobs.

  • Where memory is used, in particular, mapper-sort buffers, both kinds of reducer-merge buffers, application internal buffers.

  • When using EBS volumes, beware of the commit & replication factor


1. Does that sound complicated? It is — Nanette is able to keep track of all those blocks, but if she calls in sick, nobody can get anything done. You do NOT want Nanette to call in sick.
2. When Nanette is not on the job, it’s a total meltdown — a story for much later in the book. But you’d be wise to always take extremely good care of the Nanettes in your life.
3. This may sound outrageous to traditional IT folk, but the advantages of elasticity are extremely powerful — we’ll outline the case for virtualized Hadoop in Chapter (REF)