Skip to content

Latest commit

 

History

History
800 lines (588 loc) · 72.7 KB

Ch05-map_only_patterns.asciidoc

File metadata and controls

800 lines (588 loc) · 72.7 KB

Analytic Patterns: Map-only Operations

This chapter begins the 'Analytic Patterns' section of the book. In this chapter (and those beyond) we will walk you through a series of analytic patterns, an example of each and a summary of information about when and where you might use them. As we go you will learn and accumulate new abilities in your analytic tool chest.

This chapter focuses exclusively on what we’ll call 'Map-only operations'. A map-only operation is one that can handle each record in isolation, like the translator chimps from Chimpanzee & Elephant Corp’s first job. That property makes those operations trivially parallelizable: they require no reduce phase of their own.

Technically, these operations can be run in the map or reduce phase of map/reduce. When a script has only map-only operations, they give rise to one mapper-only job which executes the composed pipeline stages.

All of these are listed first and together for two reasons. One, they are largely fundamental; it’s hard to get much done without FILTER or FOREACH. Two, the way you reason about the performance impact of these operations is largely the same. Since these operations are trivially paralellizable, they scale efficiently and the computation cost rarely impedes throughput. And when pipelined, their performance cost can be summarized as "kids eat free with purchase of adult meal". For datasets of any material size, it’s very rare that the cost of preliminary or follow-on processing rivals the cost of the reduce phase. Finally, since these operations handle records in isolation, their memory impact is modest. So learn to think of these together.

Pattern in Use

Blocks like the following will show up after each of the patterns or groups of patterns we cover. Not every field will be present every time, since there’s not always anything interesting to say.

  • Where You’ll Use It  — (The business or programming context.) Everywhere. Like the f-stop on your camera, composing a photo begins and ends with throttling its illumination.

  • Standard Snippet  — (Just enough of the code to remind you how it’s spelled.) somerecords = FILTER myrecords BY (criteria AND criteria …​);

  • Hello, SQL Users  — (A sketch of the corresponding SQL command, and important caveats for people coming from a SQL background.) SELECT bat_season.* FROM bat_season WHERE year_id >= 1900;

  • Important to Know  — (Caveats about its use. Things that you won’t understand / won’t buy into the first time through the book but will probably like to know later.)

    • Filter early, filter often. The best thing you can do with a large data set is make it smaller.

    • SQL users take note: ==, != — not = or anything else.

    • Programmers take note: AND, OR — not &&, ||.

  • Output Count  — (How many records in the output: fewer, same, more, explosively more?) Zero to 100% of the input record count. Data size will decrease accordingly

  • Records  — (A sketch of what the records coming out of this operation look like) Identical to input

  • Data Flow  — (The Hadoop jobs this operation gives rise to. In this chapter, all the lines will look like this one; in the next chapters that will change) Map-Only: it’s composed onto the end of the preceding map or reduce, and if it stands alone becomes a map-only job.

  • Exercises for You  — (A mission to carry forward, if you choose. Don’t go looking for an answer section — we haven’t done any of them. In many cases you’ll be the first to find the answer.) Play around with `null`s and the conditional operators until you have a good sense of its quirks.

  • See Also  — (Besides the patterns in its section of the book, what other topics might apply if you’re considering this one? Sometimes this is another section in the book, sometimes it’s a pointer elsewhere) The Distinct operations, some Set operations, and some Joins are also used to eliminate records according to some criteria. See especially the Semi-Join and Anti-Join (REF), which select or reject matches against a large list of keys.

Eliminating Data

The first round of patterns will focus on methods to shrink your dataset. This may sound counterintuitive to the novice ear: isn’t the whole point of "Big Data" that we get to work with the entire dataset at once? We ultimately develop models based on the entire population, not a sample thereof, so why should we scale down our data?

The primary reason is to focus on a subset of records: only website requests with an external referrer, only security events with high threat levels, only accounts of more than $1 million. And even when you work with every record in a dataset, you may be interested in a subset of fields relevant to your research. For reasons of memory and computational efficiency, and also your sanity, you’d do yourself a favor to immediately trim a working dataset down to just those records and fields relevant to the task at hand. Last, but not least, you may want to draw a random sample just to spot-check a dataset when it’s too computationally expensive to inspect every element.

Working with a sub-set of data will simplify debugging. It also plays to our favorite refrain of, know your data. If you’re working on a dataset and there are additional fields or records you don’t plan to use, can you be certain they won’t somehow creep into your model? The worst-case scenario here is what’s called a feature leak, wherein your target variable winds up in your training data. In essence: imagine saying you can predict today’s high temperature, so long as you are first provided today’s high temperature. A feature leak can lead to painful surprises when you deploy this model to the real world.

Furthermore, you may wish to test some code on a small sample before unleashing it on a long-running job. This is generally a good habit to develop, especially if you’re one to kick off jobs before leaving the office, going to bed, or boarding a long-haul flight.

The goal of course isn’t to eliminate data, it’s to be selective about your data, and so we will introduce you to a variety of techniques for doing so.

Selecting Records that Satisfy a Condition: FILTER and Friends

The first step to eliminating (or being selective about) data is to reject records that don’t match certain criteria. Pig’s FILTER statement does this for you. It doesn’t remove the data — all data in Hadoop and thus Pig is immutable — rather like all Pig operations it creates a new table that omits certain records from the input.

The baseball stats go back to 1871 (!), but it took a few decades for the game to reach its modern form. Let’s say we’re only interested in seasons since 1900. In Pig, we apply the FILTER operation [1]:

Pig - Filter Data (ch_05/filtering_data.pig)
modern_bats = FILTER bats BY (year_id >= 1900);

The range of conditional expressions you’d expect are present: == (double-equals) to express an equality condition, != for not-equals, and >, >=, <, for inequalities; IN for presence in a list; and MATCHES for string pattern matching.

Selecting Records that Satisfy Multiple Conditions

In a data exploration, it’s often important to exclude subjects with sparse data, either to eliminate small-sample-size artifacts, or because they are not in the focus of interest. In our case, we will often want to restrict analysis to regular players — those who have seen significant playing time in a season — while allowing for injury or situational replacement. Since major-league players come to bat a bit over 4 times a game on average in a season of 154 to 162 games (it increased in 1960), we can take 450 plate appearances (roughly 2/3 of the maximum) as our threshold [2].

In Pig, you can also combine conditional statements with AND, OR, NOT. The following selects we’ll call "qualified modern seasons": regular players, competing in the modern era, in either of the two modern leagues.

Filter Data by Multiple Conditions (ch_05/filtering_data.pig)
modsig_stats = FILTER bats BY
  (PA >= 450) AND (year_id >= 1900) AND ((lg_id == 'AL') OR (lg_id == 'NL'));

Selecting or Rejecting Records with a null Value

Another table we’ll be working with is the people table. It describes players' vital statistics: their name; where and when they were born and died; when their career started and ended; their height and weight; and so forth. The data is quite comprehensive, but in some cases the fields have null values. Nulls are used in practice for many things:

  • Missing/unknown Value — the case for a fraction of early players' birthplaces or birth dates

  • No Value Applies — players who are still alive have null in the fields for date and location of death

  • Ill-formed Value — if a corrupt line creates an unparseable cell (eg a value of 'Bob' for an int), Pig will write a warning to the log but otherwise load it without complaint as null.

  • Illegal value — Division by zero and similar misbehavior results in a null value (and not an error, warning, or log statement)

  • "Other" — People will use a null value in general to represent "it’s complicated, but maybe some other field has details".

We can exclude players whose birth year or birth place is unknown with a FILTER statement:

Filtering Nulls (ch_05/filtering_data.pig)
borned = FILTER people BY (birth_year IS NOT NULL) AND (birth_place IS NOT NULL);

For those coming from a SQL background, Pig’s handling of null values will be fairly familiar. For the rest of us, good luck. Null values generally disappear without notice from operations, and generally compare as null (which signifies neither false nor true). And so null is not less than 5.0, it is not greater than 5.0, and it is not equal to 5.0. A null value is not equal to null, and is not unequal to null. You can see why for programmers it can be hard to track all this. All the fiddly collection of rules are well detailed in the Pig manual, so we won’t go deep into them here — we’ve found the best way to learn what you need is to just see lots of examples, which we endeavor to supply in abundance.

Selecting Records that Match a Regular Expression (MATCHES)

A MATCHES expression employs regular expression pattern matching against string values. Regular expressions are given as plain chararray strings; there’s no special syntax, as Python/Ruby/Perl/etc-ists might have hoped. See the sidebar (REF) for important details and references that will help you master this important tool.

This operation uses a regular expression to select players with names similar to either of your authors' names:

Filtering via Regular Expressions (ch_05/filtering_data.pig)
-- Name is `Russ`, or `Russell`; is `Flip` or anything in the Philip/Phillip/... family.
-- (?i) means be case-insensitive:
namesakes = FILTER people BY (name_first MATCHES '(?i).*(russ|russell|flip|phil+ip).*');

It’s easy to forget that people’s names can contain spaces, dots, dashes, apostrophes; start with lowercase letters or apostrophes, and have accented or other non-latin characters [3]. So as a less silly demonstration of MATCHES, this snippet extracts all names which do not start with a capital letter or which contain a non-word non-space character:

Filtering via Regular Expressions (ch_05/filtering_data.pig)
funnychars = FILTER people BY (name_first MATCHES '^([^A-Z]|.*[^\\w\\s]).*');

There are many players with non-word, non-space characters, but none whose names are represented as starting with a lowercase character. However, in early drafts of the book this query caught a record with the value "name_first" — the header rows from a source datafile had contaminated the table. Sanity checks like these are a good idea always, even more so in Big Data. When you have billions of records, a one-in-a-million exception will appear thousands of times.

Important Notes about String Matching

Regular expressions are incredibly powerful and we urge all readers to acquire basic familiarity. There is no better path to mastery than the regexp.info website, and we’ve provided a brief cheatsheet at the end of the book (REF). Here are some essential clarifications about Pig in particular:

  • Regular expressions in Pig are supplied to the MATCHES operator as plain strings. A single backslash serves the purposes of the string literal and does not appear in the string sent to the regexp engine. To pass along the shorthand [^\\w\\s] (non-word non-space characters), we have to use two backslashes.

  • Yes, that means matching a literal backslash in the target string is done with four backslashes: \\\\!

  • Options for matching are supplied within the string. For example, (?i) matches without regard to case (as we did above), (?m) to do multi-line matches, and so forth — see the documentation.

  • Pig Regular Expressions are implicitly anchored at the beginning and end of the string, the equivalent of adding ^ at the start and $ at the end. (This mirrors Java but is unlike most other languages.) Use .* at both ends, as we did above, to regain the conventional "greedy" behavior. Supplying explicit ^ or $ when intended is a good habit for readability.

  • MATCHES is an expression, like AND or == — you write str MATCHES regexp. The other regular expression mechanisms you’ll meet are functions — you write REGEX_EXTRACT(str, regexp, 1). You will forget we told you so the moment you finish this book.

  • Appearing in the crop of results: Peek-A-Boo Veach, Quincy Trouppe, and Flip Lafferty.

  • You’re allowed to have the regular expression be a value from the record, though Pig is able to pre-compile a constant (literal) regexp string for a nice speedup.

  • Pig doesn’t offer an exact equivalent to the SQL % expression for simple string matching. The rough equivalents are dot-star (.*) for the SQL % (zero or more arbitrary characters), dot (.) for the SQL _ (a single character); and square brackets (e.g. [a-z]) for a character range, similar to SQL.

  • The string equality expression is case sensitive: 'Peek-A-Boo' does not equal 'peek-a-boo' For case-insensitive string matching, use the EqualsIgnoreCase function: EqualsIgnoreCase('Peek-A-Boo', 'peek-a-boo') is true. This simply invokes Java’s String.equalsIgnoreCase() method and does not support regular expressions.

Note
Sadly, the Nobel Prize-winning physicists Gerard 't Hooft, Louis-Victor Pierre Raymond de Broglie, or Tomonaga Shin’ichirō never made the major leagues. Or tried out, as far as we know. But their names are great counter-examples to keep in mind when dealing with names. Prof de Broglie’s full name is 38 characters long, has a last name that starts with a lowercase letter, and is non-trivial to segment. "Tomonaga" is a family name, though it comes first. You’ll see Prof. Tomonaga’s name given variously as "Tomonaga Shin’ichirō", "Sin-Itiro Tomonaga", or "朝永 振一郎", each one of them correct, and the others not, depending on context. Prof. 't Hooft's last name starts with an apostrophe, a lower-case-letter, and contains a space. You’re well advised to start a little curio shelf in your workshop for counterexample collections such as these, and we’ll share some of ours throughout the book.
Pattern in Use

Note to production: Please ensure this comes WITH the preceding header/section, and not after the 'Important Notes on String Matching' sidebar.

  • Where You’ll Use It  — Wherever you need to select records by a string field. For selecting against small lists. For finding ill-formed records. Matching against a subsection of a composite key — Can you figure out what game_id MATCHES '…​(19|20).*' in the games table does?

  • Standard Snippet  — FILTER recs BY (str MATCHES '.pattern.'), sure, but also FOREACH recs GENERATE (str MATCHES '.(kitty|cat|meow).' ? 'cat' : 'notcat') AS catness.

  • Hello, SQL Users  — Similar to but more powerful than the LIKE operator. See the sidebar (ref) for a conversion guide.

  • Important to Know  — 

    • Mostly, that these are incredibly powerful, and even if they seem arcane now they’re much easier to learn than it first seems.

    • You’re far better off learning one extra thing to do with a regular expression than most of the other string conditional functions Pig offers.

    • …​ and enough other Importants to Know that we made a sidebar of them (REF).

  • Records  — You can use this in a filter clause but also anywhere else an expression is permitted, like the preceding snippet

  • Data Flow  — Map-Only: it’s composed onto the end of the preceding map or reduce, and if it stands alone becomes a map-only job.

  • Exercises for You  — Follow the regexp.info tutorial, but only up to the part on Grouping & Capturing. The rest you are far better off picking up once you find you need it.

  • See Also  — The Pig REGEX_EXTRACT and REPLACE functions. Java’s Regular Expression documentation for details on its pecadilloes (but not for an education about regular expressions).

Matching Records against a Fixed List of Lookup Values

If you plan to filter by matching against a small static list of values, Pig offers the handy IN expression: true if the value is equal (case-sensitive) to any of the listed values. This selects the stadiums used each year by the current teams in baseball’s AL-east division:

Pig - Filtering Against a List of Values (ch_05/filtering_data.pig)
al_east_parks = FILTER park_team_years BY
  team_id IN ('BAL', 'BOS', 'CLE', 'DET', 'ML4', 'NYA', 'TBA', 'TOR', 'WS2');

Sometimes a regular expression alternative can be the right choice instead. bubba MATCHES 'shrimp (kabobs|creole|gumbo|soup|stew|salad|and potatoes|burger|sandwich)' OR bubba MATCHES '(pineapple|lemon|coconut|pepper|pan.fried|deep.fried|stir.fried) shrimp' is more readable than bubba IN ('shrimp kabobs', 'shrimp creole', 'shrimp gumbo', …​).

When the list grows somewhat larger, an alternative is to read it into a set-membership data structure [4], but ultimately large data sets belong in data files.

The general case is handled bu using a join, as described in the next chapter (REF) under "Selecting Records Having a Match in Another Table (semi-join)". See in particular the specialized merge join and HashMap (replicated) join, which can offer a great speedup if you meet their qualifications. Finally, you may find yourself with an extremely large table but with few elements expected to match. In that case, a Bloom Filter may be appropriate. They’re discussed more in the statistics chapter, where use a Bloom Filter to match every phrase in a large document set against a large list of place names, effectively geolocating the documents.

Pattern in Use
  • Where You’ll Use It  — File types or IP addresses to select/reject from web logs. Keys for exemplar records you’re tracking through a dataflow. Stock symbols you’re researching. Together with "Summarizing Multiple Subsets of a Group Simultaneously" (REF), enumerate members of a cohort ((state IN ('CA', 'WA', 'OR') ? 1 : 0) AS is_western, …​).

  • Standard Snippet  — foo IN ('this', 'that', 'the_other'), or any of the other variants given above

  • Hello, SQL Users  — This isn’t anywhere near as powerful as SQL’s IN expression. Most importantly, you can’t supply another table as the list.

  • Important to Know  — A regular expression alternation is often the right choice instead.

  • Output Count  — As many records as the cardinality of its key, i.e. the number of distinct values. Data size should decrease greatly.

  • Data Flow  — Map-Only: it’s composed onto the end of the preceding map or reduce, and if it stands alone becomes a map-only job.

Project Only Chosen Columns by Name

While a FILTER selects rows based on an expression, Pig’s FOREACH selects specific fields chosen by name. The fancy word for this simple action is "projection". We’ll try to be precise in using project for choosing columns, select for choosing rows by any means, and filter where we specifically mean selecting rows that satisfy a conditional expression.

The tables we’re using come with an overwhelming wealth of stats, but we only need a few of them to do fairly sophisticated explorations. The gamelogs table has more than 90 columns; to extract just the teams and the final score, use a FOREACH:

Projecting Only Chosen Columns by Name (ch_05/data_elimination.pig)
game_scores = FOREACH games GENERATE
  away_team_id, home_team_id, home_runs_ct, away_runs_ct;

Using a FOREACH to Select, Rename and Reorder fields

You’re not limited to simply restricting the number of columns; you can also rename and reorder them in a projection. Each record in the table above has two game outcomes, one for the home team and one for the away team. We can represent the same data in a table listing outcomes purely from each team’s perspective:

Altering Column Names (ch_05/data_elimination.pig)
games_a = FOREACH games GENERATE
  year_id, home_team_id AS team,
  home_runs_ct AS runs_for, away_runs_ct AS runs_against, 1 AS is_home:int;

games_b = FOREACH games GENERATE
  away_team_id AS team,     year_id,
  away_runs_ct AS runs_for, home_runs_ct AS runs_against, 0 AS is_home:int;

team_scores = UNION games_a, games_b;

DESCRIBE team_scores;
--   team_scores: {team: chararray,year_id: int,runs_for: int,runs_against: int,is_home: int}

The first projection puts the home_team_id into the team slot, renaming it team; retains the year_id field unchanged; and files the home and away scores under runs_for and runs_against. Lastly, we slot in an indicator field for home games, supplying both the name and type as a matter of form. Next we generate the corresponding table for away games, then stack them together with the UNION operation (to which you’ll be properly introduced in a few pages). All the tables have the identical schema shown, even though their values come from different columns in the original tables.

Pattern in Use
  • Where You’ll Use It  — Nearly everywhere. If FILTER is the f-stop of our camera, this is the zoom lens.

  • Standard Snippet  — FOREACH recs GENERATE only, some, columns;

  • Important to Know  — As you can see, we take a lot of care visually aligning subexpressions within the code snippets. That’s not because we’ve tidied up the house for students coming over — this is what the code we write and the code our teammates expect us to write looks like.

  • Output Count  — Exactly the same as the input.

  • Records  — However you define them to be

  • Data Flow  — Map-Only: it’s composed onto the end of the preceding map or reduce, and if it stands alone becomes a map-only job.

  • See Also  — "Assembling Literals with Complex Type" (REF)

Extracting a Random Sample of Records

Another common operation is to extract a uniform sample — one where every record has an equivalent chance of being selected. For example, you could use this to test new code before running it against the entire dataset (and possibly having a long-running job fail due to a large number of mis-handled records). By calling the `SAMPLE`operator, you ask Pig to pluck out some records at random.

The following Pig code will return a randomly-selected 10% (that is, 1/10 = 0.10) of the records from our baseball dataset:

Sampling Data (ch_05/data_elimination.pig)
some_seasons_samp = SAMPLE bat_seasons 0.10;

The SAMPLE operation does so by generating a random number to select records, which means each run of a script that uses SAMPLE will yield a different set of records. Sometimes this is what you want, or in the very least, you don’t mind. In other cases, you may want to draw a uniform sample once, then repeatedly work through those same records. (Consider our example of spot-checking new code against a dataset: you’d need to run your code against the same sample in order to confirm your changes work as expected.)

Experienced software developers will reach for a "seeding" function — such as R’s set.seed() or Python’s random.seed() —  to make the randomness a little less so. At the moment, Pig does not have an equivalent function. Even worse, it is not consistent within the task — if a map task fails on one machine, the retry attempt will generate different data sent to different reducers. This rarely causes problems, but for anyone looking to contribute back to the Pig project, this is a straighforward high-value issue to tackle.

Pattern in Use
  • Where You’ll Use It  — At the start of the exploration, to cut down on data size. In many machine learning algorithms. Don’t use it for simulations — you need to be taking aggressive charge of the sampling algorithm.

  • Important to Know

    • A consistent sample is a much better practice, though we admit that can be more of a hassle. But records that dance around mean you can’t Know Thy Data as you should.

    • The DataFu package has UDFs for sampling with replacement and other advanced features.

  • Output Count  — Determined by the sampling fraction. As a rule of thumb, variances of things are square-root-ish; expect the size of a 10% sample to be in the 7%-13% range.

  • Records  — Identical to the input

  • Data Flow  — Map-Only: it’s composed onto the end of the preceding map or reduce, and if it stands alone becomes a map-only job.

  • Exercises for You  — Modify Pig’s SAMPLE function to accept a seed parameter, and submit that patch back to the open-source project. This is a bit harder to do than it seems: sampling is key to efficient sorting and so the code to sample data is intertwingled with a lot of core functionality.

Extracting a Consistent Sample of Records by Key

A good way to stabilize the sample from run to run is to use a 'consistent hash digest'. A hash digest function creates a fixed-length fingerprint of a string whose output is otherwise unpredictable from the input and uniformly distributed — that is, you can’t tell which string the function will produce except by computing the digest, and every string is equally likely. For example, the hash function might give the hexadecimal-string digest 3ce3e909 for 'Chimpanzee' but 07a05f9c for 'Chimp'. Since all hexadecimal strings have effectively equal likelihood, one-sixteenth of them will start with a zero, and so this filter would reject Chimpanzee but select Chimp.

Unfortunately, Pig doesn’t have a good built-in hash digest function! Do we have to give up all hope? You’ll find the answer later in the chapter (REF) [5], but for now instead of using a good built-in hash digest function let’s use a terrible hash digest function. A bit under 10% of player_ids start with the letter 's', and any coupling between a player’s name and performance would be far more subtle than we need to worry about. So the following simple snippet gives a 10% sample of batting seasons whose behavior should reasonably match that of the whole:

Extracting a Consistent Sample (ch_05/data_elimination.pig)
some_seasons  = FILTER bat_seasons BY (SUBSTRING(player_id, 0, 1) == 's');

We called this a terrible hash function, but it does fit the bill. When applied to an arbitrary serial identifier it’s not terrible at all — the Twitter firehose provides a 1% service tier which returns only tweets from users whose numeric ID ends in '00', and a 10% tier with user IDs ending in 0. We’ll return to the subject with a proper hash digest function later on in the chapter, once you’re brimming with even more smartitude than you are right now.

  • Where You’ll Use It  — At the start of the exploration,

  • Important to Know

    • If you’ll be spending a bunch of time with a data set, using any kind of random sample to prepare your development sample might be a stupid idea. You’ll notice that Red Sox players show up a lot of times in our examples — that’s because our development samples are "seasons by Red Sox players" and "seasons from 2000-2010", which lets us make good friends with the data.

  • Output Count  — Determined by the sampling fraction. As a rule of thumb, variances of things are square-root-ish; expect the size of a 10% sample to be in the 7%-13% range.

  • Records  — Identical to the input

  • Data Flow  — Map-Only: it’s composed onto the end of the preceding map or reduce, and if it stands alone becomes a map-only job.

Sampling Carelessly by Only Loading Some part- Files

Sometimes you just want to knock down the data size while developing your script, and don’t much care about the exact population. If you find a prior stage has left you with 20 files part-r-00000 through part-r-00019, specifying part-r-0000[01] (the first two out of twenty files) as the input to the next stage is a hamfisted but effective way to get a 10% sample. You can cheat even harder by adjusting the parallelism of the preceding stage to get you the file granularity you need. As long as you’re mindful that some operations leave the reducer with a biased selection of records, toggling back and forth between say my_data/part-r-0000[01] (two files) and my_data/ (all files in that directory) can really speed up development.

Selecting a Fixed Number of Records with LIMIT

A much blunter way to create a smaller dataset is to take some fixed number 'K' of records. Pig offers the LIMIT operator for this purpose. To select 25 records from our bat_seasons data, you would run:

Selecting N Arbitrary Records (ch_05/data_elimination.pig)
some_players = LIMIT bat_seasons 25;

This is somewhat similar to running the head command in Unix-like operating systems, or using the LIMIT clause in a SQL SELECT statement. However, unless you have explicitly imparted some order to the table (probably by sorting it with ORDER, which we’ll cover later (REF)), Pig gives you no guarantee over which records it selects. In the big data regime, where your data is striped across many machines, there’s no intrinsic notion of a record order. Changes in the number of mappers or reducers, in the data, or in the cluster may change which records are selected. In practice, you’ll find that it takes the first 'K' records of the first-listed file (and so, as opposed to SAMPLE, generally gives the same outcome run-to-run), but it’s irresponsible to rely on that.

When you have a very large dataset, as long as you really just need any small piece of it, you can apply the previous trick as well and just specify a single input file. Invoking LIMIT on one file will prevent a lot of trivial map tasks from running.

Other Data Elimination Patterns

There are two tools we’ll meet in the next chapter that can be viewed as data elimination patterns as well. The DISTINCT and related operations are used to identify duplicated or unique records. Doing so requires putting each record in context with its possible duplicates — meaning they are not pure pipeline operations like the others here. Above, we gave you a few special cases of selecting records against a list of values. We’ll see the general case — selecting records having or lacking a match in another table, also known as semi-join and anti-join — when we meet all the flavors of the JOIN operation in the next chapter.

Transforming Records

Besides getting rid of old records, the second-most exciting thing to do with a big data set is to rip through them manufacturing new records [6]. We’ve been quietly sneaking FOREACH into snippets, but it’s time to make its proper acquaintance

Transform Records Individually using FOREACH

The FOREACH lets you develop simple transformations based on each record. It’s the most versatile Pig operation and the one you’ll spend the most time using.

To start with a basic example, this FOREACH statement combines the fields giving the city, state and country of birth for each player into the familiar comma-space separated combined form (Austin, TX, USA) [7].

FOREACH Example One (ch_05/foreach.pig)
birthplaces = FOREACH people GENERATE
    player_id,
    StringConcat(birth_city, ', ', birth_state, ', ', birth_country) AS birth_loc
    ;

The syntax should be largely self-explanatory: this runs through the people table, and outputs a table with two columns, the player ID and our synthesized string. In the output you’ll see that when StringConcat encounters records with null values, it returned null as well without an error.

For the benefit of SQL aficionados, here’s an equivalent SQL query:

SELECT
    player_id,
    CONCAT(birth_city, ', ', birth_state, ', ', birth_country) AS birth_loc
  FROM people;

You’ll recall we took some care when loading the data to describe the table’s schema, and Pig makes it easy to ensure that the data continues to be typed. Run DESCRIBE birthplaces; to return the schema:

birthplaces: {player_id: chararray,birth_loc: chararray}

Since player_id carries through unchanged, its name and type convey to the new schema. Pig figures out that the result of CONCAT is a chararray, but it’s up to us to award it with a new name (birth_loc).

A FOREACH won’t cause a new Hadoop job stage: it’s chained onto the end of the preceding operation (and when it’s on its own, like this one, there’s just a single a mapper-only job). It always produces exactly the same count of output records as input records, although as you’ve seen it can change the number of columns.

A nested FOREACH Allows Intermediate Expressions

Earlier we promised you a storyline in the form of an extended exploration of player performance. We’ve now gathered enough tactical prowess to set out [8].

The stats in the bat_seasons table are all "counting stats" — total numbers of hits, of games, and so forth — and certainly from the team’s perspective the more hits the better. But for comparing players, the counting stats don’t distinguish between the player who eared 70 hits in a mere 200 trips to the plate before a season-ending injury, and the player who squandered 400 of his team’s plate appearances getting to a similar total [9]. We should also form "rate stats", normalizing those figures against plate appearances. The following simple metrics do quite a reasonable job of characterizing players' performance:

  • 'On-base percentage' (OBP) indicates how well the player meets offensive goal #1: get on base, thus becoming a potential run and not consuming a precious out. It is given as the fraction of plate appearances that are successful: ((H + BB + HBP) / PA) [10]. An OBP over 0.400 is very good (better than 95% of significant seasons).

  • 'Slugging Percentage' (SLG) indicates how well the player meets offensive goal #2: advance the runners on base, thus converting potential runs into points towards victory. It is given by the total bases gained in hitting (one for a single, two for a double, etc) divided by the number of at bats: (TB / AB, where TB := (H + h2B + 2*h3B + 3*HR)). An SLG over 0.500 is very good.

  • 'On-base-plus-slugging' (OPS) combines on-base and slugging percentages to give a simple and useful estimate of overall offensive contribution. It’s found by simply adding the figures: (OBP + SLG). Anything above 0.900 is very good.

Doing this with the simple form of FOREACH we’ve been using would be annoying and hard to read — for one thing, the expressions for OBP and SLG would have to be repeated in the expression for OPS, since the full statement is evaluated together. Pig provides a fancier form of FOREACH (a 'nested' FOREACH) that allows intermediate expressions:

Nested FOREACH (ch_05/foreach.pig)
bat_seasons = FILTER bat_seasons BY PA > 0 AND AB > 0;
core_stats  = FOREACH bat_seasons {
  TB   = h1B + 2*h2B + 3*h3B + 4*HR;
  OBP  = 1.0f*(H + BB + HBP) / PA;
  SLG  = 1.0f*TB / AB;
  OPS  = SLG + OBP;
  GENERATE
    player_id, name_first, name_last,   --  $0- $2
    year_id,   team_id,   lg_id,        --  $3- $5
    age,  G,   PA,  AB,   HBP, SH,  BB, --  $6-$12
    H,    h1B, h2B, h3B,  HR,  R,  RBI, -- $13-$19
    SLG, OBP, OPS;                      -- $20-$22
};

This alternative { curly braces form of FOREACH lets you describe its transformations in smaller pieces, rather than smushing everything into the single GENERATE clause. New identifiers within the curly braces (such as player) only have meaning within those braces, but they do inform the schema.

You’ll notice that we multiplied by 1.0 while calculating OBP and SLG. If all the operands were integers, Pig would use integer arithmetic; instead of fractions between 0 and 1, the result would always be integer 0. Multiplying by the floating-point value 1.0 forces Pig to use floating-point math, preserving the fraction. Using a typecast — SLG = (float)TB / AB — as described below is arguably more efficient but inarguably uglier. The above is what we’d write in practice.

By the way, the filter above is sneakily doing two things. It obviously eliminates records where PA is equal to zero, but it also eliminates records where PA is null. (See the section "Selecting or Rejecting Records with null Values" (REF) above for details.)

In addition to applying arithmetic expressions and functions, there are a set of operations (ORDER, DISTINCT, FOREACH, FILTER, LIMIT) you can apply to bags within a nested FOREACH. We’ll wait until the section on grouping operations to introduce their nested-foreach ("inner bag") forms.

Formatting a String According to a Template

The SPRINTF function is a great tool for assembling a string for humans to look at. It uses the printf-style templating convention common to C and many other languages to assemble strings with consistent padding and spacing. It’s best learned by seeing it in action:

Formatting Strings (ch_05/foreach.pig)
formatted = FOREACH bat_seasons GENERATE
  SPRINTF('%4d\t%-9s %-20s\tOBP %5.3f / %-3s %-3s\t%4$012.3e',
    year_id,  player_id,
    CONCAT(name_first, ' ', name_last),
    1.0f*(H + BB + HBP) / PA,
    (year_id >= 1900 ? '.'   : 'pre'),
    (PA >= 450       ? 'sig' : '.')
  ) AS OBP_summary:chararray;

So you can follow along, here are some scattered lines from the results:

1954    aaronha01 Hank Aaron            OBP 0.318 / .   sig     0003.183e-01
1897    ansonca01 Cap Anson             OBP 0.372 / pre sig     0003.722e-01
1970    carewro01 Rod Carew             OBP 0.407 / .   .       0004.069e-01
1987    gwynnto01 Tony Gwynn            OBP 0.446 / .   sig     0004.456e-01
2007    pedrodu01 Dustin Pedroia        OBP 0.377 / .   sig     0003.769e-01
1995    vanlawi01 William Van Landingham        OBP 0.149 / .   .       0001.489e-01
1941    willite01 Ted Williams          OBP 0.553 / .   sig     0005.528e-01

The parts of the template are as follows:

  • %4d: render an integer, right-aligned, in a four character slot. All the year_id values have exactly four characters, but if Pliny the Elder’s rookie season from 43 AD showed up in our dataset, it would be padded with two spaces: ` 43`. Writing %04d (i.e. with a zero after the percent) causes zero-padding: 0043.

  • \t (backslash-t): renders a literal tab character. This is done by Pig, not in the SPRINTF function.

  • %-9s: a nine-character string. Like the next field, it …​

  • %-20s: has a minus sign, making it left-aligned. You usually want this for strings.

    • We prepared the name with a separate CONCAT statement and gave it a single string slot in the template, rather than using say %-8s %-11s. In our formulation, the first and last name are separated by only one space and share the same 20-character slot. Try modifying the script to see what happens with the alternative.

    • Any value shorter than its slot width is padded to fit, either with spaces (as seen here) or with zeros (as seen in the last field. A value longer than the slot width is not truncated — it is printed at full length, shifting everything after it on the line out of place. When we chose the 19-character width, we didn’t count on William Van Landingham’s corpulent cognomen contravening character caps, correspondingly corrupting columnar comparisons. Still, that only messes up Mr. Van Landingham’s line — subsequent lines are unaffected.

  • OBP: Any literal text you care to enter just carries through. In case you’re wondering, you can render a literal percent sign by writing %%.

  • %5.3f: for floating point numbers, you supply two widths. The first is the width of the full slot, including the sign, the integer part, the decimal point, and the fractional part. The second number gives the width of the fractional part. A lot of scripts that use arithmetic to format a number to three decimal places (as in the prior section) should be using SPRINTF instead.

  • %-3s %-3s: strings indicating whether the season is pre-modern (\<\= 1900) and whether it is significant (>= 450 PA). We could have used true/false, but doing it as we did here — one value tiny, the other with visual weight — makes it much easier to scan the data.

    • By inserting the / delimiter and using different phrases for each indicator, it’s easy to grep for matching lines later — grep -e '/.*sig' — without picking up lines having 'sig' in the player id.

  • %4$12.3e: Two things to see here:

    • Each of the preceding has pulled its value from the next argument in sequence. Here, the 4$ part of the specifier uses the value of the fourth non-template argument (the OBP) instead.

    • The remaining 012.3e part of the specifier says to use scienfific notation, with three decimal places and twelve total characters. Since the strings don’t reach full width, their decimal parts are padded with zeroes. When you’re calculating the width of a scientific notation field, don’t forget to include the two sign characters: one for the number and one for the exponent

We won’t go any further into the details, as the SPRINTF function is well documented and examples of printf-style templating abound on the web. But this is a useful and versatile tool, and if you’re able to mimic the elements used above you understand its essentials.

Assembling Literals with Complex Types

Another reason you may need the nested form of FOREACH is to assemble a complex literal. If we wanted to draw key events in a player’s history — birth, death, start and end of career — on a timeline, or wanted to place the location of their birth and death on a map, it would make sense to prepare generic baskets of events and location records. We will solve this problem in a few different ways to demonstrate assembling complex types from simple fields.

Note
To use the functions in Piggybank and DataFu, we have to REGISTER their jar files so Pig knows about them. This is accomplished with the REGISTER command, as in: REGISTER /usr/lib/pig/datafu.jar.
Parsing a Date

Parsing dates in Pig is easy with the ToDate function, and SPRINTF.

Assembling Complex Types (ch_05/foreach.pig)
REGISTER /usr/lib/pig/datafu.jar
DEFINE Coalesce datafu.pig.util.Coalesce();
DEFINE SPRINTF datafu.pig.util.SPRINTF();

-- Assembling complex types (Only for Pig >= 0.14.0)
people = FILTER people BY (beg_date IS NOT NULL) AND
                          (end_date IS NOT NULL) AND
                          (birth_year IS NOT NULL) AND
                          (death_year IS NOT NULL);

date_converted = FOREACH people {
    beg_dt   = ToDate(CONCAT(beg_date, 'T00:00:00.000Z'));
    end_dt   = ToDate(end_date, 'yyyy-MM-dd', '+0000');
    birth_dt = ToDate(
        SPRINTF('%s-%s-%sT00:00:00Z', birth_year, Coalesce(birth_month,1), Coalesce(birth_day,1))
    );
    death_dt = ToDate(
        SPRINTF('%s-%s-%sT00:00:00Z', death_year, Coalesce(death_month,1), Coalesce(death_day,1))
    );

    GENERATE player_id, birth_dt, death_dt, beg_dt, end_dt, name_first, name_last;
};

One oddity of the people table’s structure as it arrived to us is that the birth/death dates are given with separate fields, while the beginning/end of career dates are given as ISO date strings. We left that alone because this kind of inconsistency is the reality of data sets in practice — in fact, this is about as mild a case as you’ll find. So one thing we’ll have to do is pick a uniform date representation and go forward with it.

You may have heard the saying "The two hardest things in Computer Science are cache coherency and naming things". Our nominations for the two most horrible things in Computer Science are time zones and character encoding [11] Elsewhere you’ll hear ". Our rule for Time Zones is "put it in UTC immediately and never speak of it again [12]. A final step in rendering data for an end-user interface may convert to local time, but at no point in data analysis should you tolerate anything but UTC. We’re only working with dates right here, but we’ll repeat that rule every chance we have in the book.

There are two and a half defensible ways to represent a date or time:

  • As an ISO 8601 Date/Time string in the UTC time zone. It sounds scary when we say "ISO 8601", but it’s self-explanatory and you see all over the place: '2007-08-09T10:11:12Z' is an example of a time, and '2007-08-09' is an example of a date. It’s compact enough to not worry about, there’s little chance of it arriving in that format by accident, everything everywhere can parse it, and you can do ad-hoc manipulation of it using string functions (eg (int)SUBSTRING(end_date,0,4) to extract a year). Use this format only if you are representing instants that come after the 1700s, only need seconds-level precision, and where human readability is more important than compactness (which we encourage).

  • As an integer number of epoch milliseconds in the UTC time zone, which is to say as the number of elapsed milliseconds since midnight January 1st, 1970 UTC. (You may see this referred to as 'UNIX time'.) It allows you to easily calculate durations, and is nearly universal as well. Its value fits nicely in an unsigned 64-bit long. We believe using fractional epoch time — e.g. 1186654272892.657 to mean 657 microseconds into the given second — is carrying the joke too far. If you care about micro- or nano-seconds, then you need to care about floating point error, and the leading part of the number consumes too much of your precision. Use this format only if you are representing instants that come after the start of the epoch; only need millisecond precision; and don’t care about leap seconds.

  • A domain representation chosen judiciously by an expert. If neither of the above two representations will work for you then sorry: you need to get serious. Astronomers and anyone else working at century scale will likely use some form of Julian Date; those working at nanosecond scale should look at TAI; there are dozens of others. You’ll probably have to learn things about leap seconds or sidereal times or the fluid space-time discontinuum that is the map of Time Zones, and you will wish you didn’t have to. We’re not going to deal with this category as it’s far, far beyond the scope of the book.

In general we will leave times in their primitive data type (long for epoch milliseconds, chararray for ISO strings) until we need them to be proper datetime data structures. The lines above show a couple ways to create datetime values; here’s the fuller catalog.

Epoch milliseconds are easily converted by calling ToDate(my_epoch_millis). For an ISO format string with date, time and time zone, pass it as a single chararray string argument: ToDate(beg_date). If its lacks the time-of-day or time zone part, you must fill it out first: ToDate(CONCAT(beg_date, 'T00:00:00.000Z')). If the string has a non-standard format, supply two additional arguments: a template according to Java’s SimpleDateFormat, and unless the input has a timezone, the UTC time zone string '+0000'. For example, ToDate(end_date, 'yyyy-MM-dd', '+0000') demonstrates anoter way to parse an ISO date string: viable, but more expensive than the one-arg version.

For composite year-month-day-etc fields, create an ISO-formatted string and pass it to ToDate. Here’s the snippet we used, in slow motion this time:

ToDate(
  SPRINTF('%s-%s-%sT00:00:00Z',		     -- ISO format template
    birth_year,				     -- if year is NULL, value will be null
    (birth_month IS NULL ? 1 : birth_month), -- but coerce null month or day to 1
    (birth_day IS NULL ? 1 : birth_day)
  ));
Note
Apart from subtracting one epoch milliseconds from another to get a duration in milliseconds, you must never do any date/time manipulation except through a best-in-class date library. You can’t calculate the difference of one year by adding one to the year field (which brought down Microsoft’s cloud storage product on the leap day of February 29th, 2012), and you can’t assume that the time difference from one minute to the next is 60 seconds (which brought down HBase servers worldwide when the leap second of 2012-06-30T23:59:60Z — note the :60 — occurred). This is no joke — companies go out of business because of mistakes like these.
Assembling a Bag

Sometimes it can be useful to create complex objects to be referenced and processed later. While we try to keep schemas as simple as possible, sometimes you want a bag.

We can take things further and assemble truly complex records of bags of tuples, and nested tuples.

Assembling Very Complex Types (ch_05/foreach.pig)
graphable = FOREACH people {
    birth_month = Coalesce(birth_month, 1); birth_day = Coalesce(birth_day, 1);
    death_month = Coalesce(death_month, 1); death_day = Coalesce(death_day, 1);
    beg_dt   = ToDate(beg_date);
    end_dt   = ToDate('yyyy-MM-dd', end_date);
    birth_dt = ToDate(SPRINTF('%s-%s-%s', birth_year, birth_month, birth_day));
    death_dt = ToDate(SPRINTF('%s-%s-%s', death_year, death_month, death_day));
    --
    occasions = {
        ('birth', birth_year, birth_month, birth_day),
        ('death', death_year, death_month, death_day),
        ('debut',
            (int)SUBSTRING(beg_date,0,4),
            (int)SUBSTRING(beg_date,5,7),
            (int)SUBSTRING(beg_date,8,10)
        ),
        ('lastg',
            (int)SUBSTRING(end_date,0,4),
            (int)SUBSTRING(end_date,5,7),
            (int)SUBSTRING(end_date,8,10)
        )
    };
    --
    places = (
        (birth_dt, birth_city, birth_state, birth_country),
        (death_dt, death_city, death_state, death_country)
    );

    GENERATE
    player_id,
    occasions AS occasions:bag{occasion:(name:chararray, year:int, month:int, day:int)},
    places    AS places:tuple( birth:tuple(date, city, state, country),
                               death:tuple(date, city, state, country) )
    ;
};

The occasions intermediate alias is a bag of event tuples holding a chararray and three ints. Bags are disordered (unless you have transiently applied an explicit sorted), and so we’ve prefixed each event with a slug naming the occasion.

You can do this inline (non-nested FOREACH) but we wouldn’t, as it is not readable.

Manipulating the Type of a Field

We used StringConcat to combine players' city, state and country of birth into a combined field without drama. But if we tried to do the same for their date of birth by writing StringConcat(birth_year, '-', birth_month, '-', birth_day), Pig would throw an error: Could not infer the matching function for org.apache.pig.builtin.StringConcat…​. You see, StringConcat understandably wants to consume and deliver strings, and so isn’t in the business of guessing at and fixing up types. What we need to do is coerce the int values — eg, 1961, a 32-bit integer — into chararray values — eg '1961', a string of four characters. You do so using C-style typecast expression: (chararray)birth_year. Here it is in action:

Manipulating Field Types (ch_05/types.pig)
birthplaces = FOREACH people GENERATE
    player_id,
    StringConcat(
        (chararray)birth_year, '-',
        (chararray)birth_month, '-',
        (chararray)birth_day
    ) AS birth_date
;

In other cases you don’t need to manipulate the type going in to a function, you need to manipulate the type going out of your FOREACH.

Here are several takes on a FOREACH statement to find the slugging average:

Managing Floating Point Schemas (ch_05/types.pig)
obp_1 = FOREACH bat_seasons {
  OBP = 1.0f * (H + BB + HBP) / PA; -- constant is a float
  GENERATE OBP;                     -- making OBP a float
};
-- obp_1: {OBP: float}

The first stanza matches what was above. We wrote the literal value as 1.0f — which signifies the float value 1.0 — thus giving OBP the implicit type float as well.

obp_2 = FOREACH bat_seasons {
  OBP = 1.0 * (H + BB + HBP) / PA;  -- constant is a double
  GENERATE OBP;                     -- making OBP a double
};
-- obp_2: {OBP: double}

In the second stanza, we instead wrote the literal value as 1.0 — type double — giving OBP the implicit type double as well.

obp_3 = FOREACH bat_seasons {
  OBP = (float)(H + BB + HBP) / PA; -- typecast forces floating-point arithmetic
  GENERATE OBP AS OBP;              -- making OBP a float
};
-- obp_3: {OBP: float}

The third stanza takes a different tack: it forces floating-point math by typecasting the result as a float, thus also implying type float for the generated value [13].

obp_4 = FOREACH bat_seasons {
  OBP = 1.0 * (H + BB + HBP) / PA;  -- constant is a double
  GENERATE OBP AS OBP:float;        -- but OBP is explicitly a float
};
-- obp_4: {OBP: float}

In the fourth stanza, the constant was given as a double. However, this time the AS clause specifies not just a name but an explicit type, and that takes precedence [14].

broken = FOREACH bat_seasons {
  OBP = (H + BB + HBP) / PA;        -- all int operands means integer math and zero as result
  GENERATE OBP AS OBP:float;        -- even though OBP is explicitly a float
};
-- broken: {OBP: float}

The fifth and final stanza exists just to re-prove the point that if you care about the types Pig will use, say something. Although the output type is a float, the intermediate expression is calculated with integer math and so all the answers are zero. Even if that worked, you’d be a chump to rely on it: use any of the preceding four stanzas instead.

Ints and Floats and Rounding, Oh My!

Another occasion for type conversion comes when you are trying to round or truncate a fractional number.

The first four fields of the following statement turn the full-precision result of calculating OBP (0.31827113) into a result with three fractional digits (0.318), as OBP is usually represented.

Rounding (ch_05/types.pig)
rounded = FOREACH bat_seasons GENERATE
  (ROUND(1000.0f*(H + BB + HBP) / PA)) / 1000.0f AS round_and_typecast,
  ((int)(1000.0f*(H + BB + HBP) / PA)) / 1000.0f AS typecast_only,
  (FLOOR(1000.0f*(H + BB + HBP) / PA)) / 1000    AS floor_and_typecast,
  ROUND_TO( 1.0f*(H + BB + HBP) / PA, 3)         AS what_we_would_use,
  SPRINTF('%5.3f', 1.0f*(H + BB + HBP) / PA)     AS but_if_you_want_a_string_just_say_so,
  1.0f*(H + BB + HBP) / PA                       AS full_value
  ;

The round_and_typecast field shows a fairly common (and mildly flawed) method for chunking or partially rounding values: scale-truncate-rescale. Multiplying 0.31827113 by 1000.0f gives a float result 318.27113; rounding it gets an integer value 318; rescaling by 1000.0f gives a final result of 0.318f, a float. The second version works mostly the same way, but has no redeeming merits. Use a typecast expression when you want to typecast, not for its side effects. This muddy formulation leads off with a story about casting things to type int, but only a careful ticking off of parentheses shows that we swoop in at the end and implicitly cast to float. If you want to truncate the fractional part, say so by using the function for truncating the fractional part, as the third formulation does. The FLOOR method uses machine numeric functions to generate the value. This is likely more efficient, and it is certainly more correct.

Floating-point arithmetic, like unicode normalization and anything cryptography, has far more complexity than anyone who wants to get things done can grasp. At some point, take time to become aware of the built-in math functions that are available [15]. You don’t have to learn them, just stick the fact of their existence in the back of your head. If the folks at the IEEE have decided every computer on the planet should set aside silicon for a function to find the log of 1 plus 'x' (log1p), or a function to find the remainder when dividing two numbers (IEEEremainder), you can bet there’s a really good reason why your stupid way of doing it is some mixture of incorrect, inaccurate, or fragile.

That is why the formulation we would actually use to find a rounded number is the fourth one. It says what we mean ("round this number to three decimal places") and it draws on Java library functions built for just this purpose. The error between the ROUND formulation and the ROUND_TO formulation is almost certainly miniscule. But multiply "miniscule" by a billion records and you won’t like what comes out.

Calling a User-Defined Function (UDF) from an External Package

You can extend Pig’s functionality with 'User-Defined Functions' (UDFs) written in Java, Python, Ruby, Javascript and others. These have first-class functionality — almost all of Pig’s native functions are actually Java UDFs that just happen to live in a builtin namespace. We’ll describe how to author a UDF in a later chapter (REF), but this is a good time to learn how to call one.

The DataFu package is an collection of Pig extensions open-sourced by LinkedIn, and in our opinion everyone who uses Pig should install it. It provides the most important flavors of hash digest and checksum you need in practice, and explains how to choose the right one. For consistent hashing purposes, the right choice is the "Mumur 3" function [16], and since we don’t need many bytes we’ll use the 32-bit flavor.

You must do two things to enable use of a UDF. First, so that pig can load the UDF’s code, call the REGISTER command with the path to the UDF’s .jar file. You only need to REGISTER a jar once, even if you’ll use more than one of its UDFs.

Second, use the DEFINE command to construct it. DEFINE takes two arguments, separated by spaces: the short name you will use to invoke the command, and the fully-qualified package name of its class (eg datafu.pig.hash.Hasher). Some UDFs, including the one we’re using, accept or require constructor arguments (always strings). These are passed function-call style, as shown below. There’s nothing wrong with DEFINE-ing a UDF multiple times with different constructor arguments — for example, adding a line DEFINE DigestMD5 datafu.pig.hash.Hasher('md5'); would create a hash function that used the MD5 (REF) algorithm.

REGISTERing jars and Using UDFs (ch_05/udf.pig)
-- Register the jar containing the UDFs
REGISTER /usr/lib/pig/datafu.jar
-- Murmur3, 32 bit version: a fast statistically smooth hash digest function
DEFINE Digest datafu.pig.hash.Hasher('murmur3-32');

bat_seasons = LOAD '/data/gold/sports/baseball/bat_seasons.tsv' USING PigStorage('\t') AS (
    player_id:chararray, name_first:chararray, name_last:chararray,     --  $0- $2
    year_id:int,        team_id:chararray,     lg_id:chararray,         --  $3- $5
    age:int,  G:int,    PA:int,   AB:int,  HBP:int,  SH:int,   BB:int,  --  $6-$12
    H:int,    h1B:int,  h2B:int,  h3B:int, HR:int,   R:int,    RBI:int  -- $13-$19
);
bat_seasons = FILTER bat_seasons BY PA > 0 AND AB > 0;

-- Prepend a hash of the player_id
keyed_seasons = FOREACH bat_seasons GENERATE Digest(player_id) AS keep_hash, *;

-- Prepare a reproducible sample of bat seasons
some_seasons  = FOREACH (
    FILTER keyed_seasons BY (SUBSTRING(keep_hash, 0, 1) == '0')
  ) GENERATE $0..;

Operations that Break One Table Into Many

Pig Latin is a language defined by the manipulation of dataflows. In Pig, we often load an entire dataset at once and manipulate it in a stream. Streams needn’t remain singular - they can split and merge like real rivers, flowing through different filters and operations before re-joining further on. It can help in writing Pig scripts to visualize your data as a dataflow. Now lets learn about SPLIT and UNION.

Directing Data Conditionally into Multiple Data Flows (SPLIT)

The careers table gives the number of times each player was elected to the All-Star game (indicating extraordinary performance during a season) and whether they were elected to the Hall of Fame (indicating a truly exceptional career).

Demonstration in Pig

Separating those records into different data flows isn’t straightforward in map/reduce, but it’s very natural using Pig’s SPLIT operation.

Splitting Relations (ch_05/split_union.pig)
SPLIT bat_seasons
  INTO young   IF age <= 30,
       middle  IF (age >= 30) AND (age < 40),
       old OTHERWISE
;
STORE young  INTO 'young_players';
STORE middle INTO 'middle_age_players';
STORE old    INTO 'old_players';

The SPLIT operator does not short-circuit: every record is tested against every condition, so a player who is 30 will be written to both young_players and middle_age_players.

The most natural use of the SPLIT operator is when you really do require divergent processing flows. In the next chapter, you’ll use a JOIN LEFT OUTER to geolocate (derive longitude and latitude from place name) records. That method is susceptible to missing matches, and so in practice a next step might be to apply a fancier but more costly geolocation tool. This is a strategy that arises often in advanced machine learning applications: run a first pass with a cheap algorithm that can estimate its error rate; isolate the low-confidence results for harder processing; then reunite the whole dataset.

The syntax of the SPLIT command does not have an equals sign to the left of it; the new table aliases are created in its body.

Operations that Treat the Union of Several Tables as One

The counterpart to splitting a table into pieces is to treat many pieces as a single table. This really only makes sense when all those pieces have the same schema, so that’s the only case we’ll handle here.

Treat Several Pig Relation Tables as a Single Table (Stacking Rowsets)

In Pig, you can rejoin several pipelines using the UNION operation. The tables we’ve been using so far cover only batting stats; there are another set of tables covering stats for pitchers, and in rare cases a player may only appear in one or the other. To find the name and id of all players that appear in either table, we can project the fields we want (earning a uniform schema) and then unify the two streams:

Union Treats Several Tables as a Single Table (ch_05/split_union.pig)
-- Unions can bring relations with the same schema together
young_player_seasons = LOAD 'young_player_seasons' USING PigStorage('\t') AS (
    player_id:chararray, name_first:chararray, name_last:chararray,     --  $0- $2
    year_id:int,        team_id:chararray,     lg_id:chararray,         --  $3- $5
    age:int,  G:int,    PA:int,   AB:int,  HBP:int,  SH:int,   BB:int,  --  $6-$12
    H:int,    h1B:int,  h2B:int,  h3B:int, HR:int,   R:int,    RBI:int  -- $13-$19
);
middle_age_player_seasons = LOAD 'middle_age_player_seasons' USING PigStorage('\t') AS (
    player_id:chararray, name_first:chararray, name_last:chararray,     --  $0- $2
    year_id:int,        team_id:chararray,     lg_id:chararray,         --  $3- $5
    age:int,  G:int,    PA:int,   AB:int,  HBP:int,  SH:int,   BB:int,  --  $6-$12
    H:int,    h1B:int,  h2B:int,  h3B:int, HR:int,   R:int,    RBI:int  -- $13-$19
);
old_player_seasons = LOAD 'old_player_seasons' USING PigStorage('\t') AS (
    player_id:chararray, name_first:chararray, name_last:chararray,     --  $0- $2
    year_id:int,        team_id:chararray,     lg_id:chararray,         --  $3- $5
    age:int,  G:int,    PA:int,   AB:int,  HBP:int,  SH:int,   BB:int,  --  $6-$12
    H:int,    h1B:int,  h2B:int,  h3B:int, HR:int,   R:int,    RBI:int  -- $13-$19
);

young_names = FOREACH young_player_seasons GENERATE player_id, name_first, name_last;
middle_age_names = FOREACH middle_age_player_seasons GENERATE player_id, name_first, name_last;
old_names = FOREACH old_player_seasons GENERATE player_id, name_first, name_last;

all_players = UNION young_names, middle_age_names, old_names;
all_unique_players = DISTINCT all_players;

Note that this is not a Join (which requires a reduce, and changes the schema of the records) — this is more like stacking one table atop another, making no changes to the records (schema or otherwise) and does not require a reduce.

A common use of the UNION statement comes in 'symmetrizing' a relationship. For example, each line in the games table describes in a sense two game outcomes: one for the home team and one for the away team. We might reasonably want to prepare another table that listed game outcomes: game_id, team, opponent, team’s home/away position, team’s score, opponent’s score. The game between BAL playing at BOS on XXX (final score BOS Y, BAL Z) would get two lines: GAMEIDXXX BOS BAL 1 Y Z and GAMEID BAL BOS 0 Z Y.

Symmetrizing a Relationship (ch_05/split_union.pig)
games = LOAD '/data/gold/sports/baseball/games_lite.tsv' AS (
  game_id:chararray,      year_id:int,
  away_team_id:chararray, home_team_id:chararray,
  away_runs_ct:int,       home_runs_ct:int
);

games_a = FOREACH games GENERATE
  year_id, home_team_id AS team,
  home_runs_ct AS runs_for, away_runs_ct AS runs_against, 1 AS is_home:int;

games_b = FOREACH games GENERATE
  away_team_id AS team,     year_id,
  away_runs_ct AS runs_for, home_runs_ct AS runs_against, 0 AS is_home:int;

team_scores = UNION games_a, games_b;

STORE team_scores INTO 'team_scores';

DESCRIBE team_scores;
-- team_scores: {team: chararray,year_id: int,runs_for: int,runs_against: int,is_home: int}

The UNION operation does not remove duplicate rows as a set-wise union would. It simply tacks one table onto the end of the other, and so the last line eliminates those duplicates — more on DISTINCT in the next chapter (REF). The UNION operation also does not provide any guarantees on ordering of rows. Some SQL users may fall into the trap of doing a UNION-then-GROUP to combine multiple tables. This is terrible in several ways, and you should instead use the COGROUP operation — see the Won-Loss Record example in the next chapter (REF).

Note
The UNION operator is easy to over-use. For one example, in the next chapter we’ll extend the first part of this code to prepare win-loss statistics by team. A plausible first guess would be to follow the UNION statement above with a GROUP statement, but a much better approach would use a COGROUP instead (both operators are explained in the next chapter). The UNION statement is mostly harmless but fairly rare in use; give it a second look any time you find yourself writing it in to a script.

Wrapping Up

The operations in this chapter do not require a reduce on their own, which makes them very efficient. These operations can be chained one after the other without causing a reduce to occur. Although we call them 'map-only' operations, they can be executed during either the map or at the tail of the reduce phase. The ability to chain map-only operations really helps map/reduce scale during complex operations. Arbitrarily long combinations of map-only operations can be chained one after the other and accomplished in a single phase of a Hadoop map/reduce job.

You can now LOAD, STORE, FILTER, UNION, SPLIT, LIMIT and FOREACH/GENERATE. You can accomplish quite a bit with these operations alone. For instance, you could open an Apache web server log file, search for a particular IP address, and print only those records. Or you could load stock ticker data, and hone in on just one or two stocks.

This chapter gives you a powerful tool chest with which to modify individual records in isolation. The really interesting applications, however, come when we put data into context by grouping similar records in the reduce phase of map/reduce. This is the subject of the next chapter: grouping data.


1. In this and in further scripts, we’re going omit the LOAD, STORE and other boilerplate except to prove a point. See the example code (REF) for fully-working snippets
2. Not coincidentally, that figure of 450 PA is close to the "qualified" season threshold of 3.1 plate appearances per team game that are required for seasonal performance awards
3. A demonstration of the general principle that if you believe an analysis involving people will be simple, you’re probably wrong.
4. For a dynamic language such as Ruby, it can often be both faster and cleaner to reformat the table into the language itself than to parse a data file. Loading the table is now a one-liner (require "lookup_table"), and there’s nothing the Ruby interpreter does faster than interpret Ruby.
5. Spoiler alert: No, you don’t have to give up all hope when Pig lacks a built-in function you require.
6. Although you might re-rank things when we show you how to misuse Hadoop to stress-test a webserver with millions of concurrent requests per minute (REF)
7. The country field uses some ad-hoc mixture of full name and arbitrary abbreviations. In practice, we would have converted the country fields to use ISO two-letter abbreviations — and that’s just what we’ll do in a later section (REF)
8. We also warned you we’d wander away from it frequently — the bulk of it sits in the next chapter.
9. Here’s to you, 1970 Rod Carew and 1979 Mario Mendoza
10. Although known as percentages, OBP and SLG are always given as fractions to 3 decimal places. For OBP, we’re also using a slightly modified formula to reduce the number of stats to learn. It gives nearly identical results but you will notice small discrepancies with official figures
11. Many people add "…​and off-by-one errors" to the hardest-things list. If we are allowed to re-use the same joke, the two most horrible things in Computer Science are #1 Time Zones, #2 Character Enco, #2 Threads.ding.
12. You can guess our rule for character encoding: "put it in UTF-8 immediately and never speak of it again
13. As you can see, for most of the stanzas Pig picked up the name of the intermediate expression (OBP) as the name of that field in the schema. Weirdly, the typecast in the third stanza makes the current version of Pig lose track of the name, so we chose to provide it explicitly
14. Is the intermediate result calculated using double-precision math, because it starts with a double, and then converted to float? Or is it calculated with single-precision math, because the result is a float? We don’t know, and even if we did we wouldn’t tell you. Don’t resolve language edge cases by consulting the manual, resolve them by using lots of parentheses and typecasts and explicitness. If you learn fiddly rules like that — operator precedence is another case in point — there’s a danger you might actually rely on them. Remember, you write code for humans to read and only incidentally for robots to run.
15. either as Pig built-ins, or through the Piggybank UDF library
16. Those familiar with the MD5 or SHA hashes might have expected we’d use one of them. Those would work as well, but Murmur3 is faster and has superior statistical properties; for more, see the DataFu documentation. Oh and if you’re not familiar with any of the stuff we just said: don’t worry about it, just know that 'murmur3-32' is what you should type in.