Skip to content

Latest commit

 

History

History
442 lines (342 loc) · 24.3 KB

Ch08-ordering_patterns.asciidoc

File metadata and controls

442 lines (342 loc) · 24.3 KB

Analytic Patterns: Ordering Operations

In this chapter we will cover ordering operations, or operations that sort data according to some criteria. Pig has two concepts of order - entire datasets can be sorted, as can the contents of a bag. We’ll learn how to sort relations and bags, and also how to calculate the top records of a relation by combining ORDER with LIMIT. With these skills in hand, we’ll be one step closer to being able to solve any arbitrary data processing task using the set of patterns we’ve learned.

Ordering operations are a fundamental part of storytelling. A big part of telling stories with data is coming up with examples that prove a point. This means diving into the data to produce the most exceptional records. When data is big, this invariably means you need to sort the data to pick up the highest or lowest value(s) of some metric.

So far we’ve mostly limited ourselves to the ordering inherently provided by the map/reduce shuffle/sort operation, which does provide a sorted list on the reduce key for each file. If we’re running a small job with a single reducer, that does provide a total sort. However, if we want an overall sort using multiple reducers (as we must, if we’re working with 'big data'), we must employ Pig’s ORDER command. Lets begin!

Preparing Career Epochs

In order to demonstrated ordering records, we’re going to prepare a dataset detailing the performance of players at three phases of their career: young, prime and older. To do so, we’ll be making use of familiar patterns. We use the map-only patterns of 'Selecting Records that Satisfy a Condition' and 'Selecting Records that Satisfy Multiple Conditions' with an initial FILTER, to include only seasons in the National or American leagues in our analysis. The map-only patterns 'Transform Records Individually using FOREACH’ and 'A nested `FOREACH Allows Intermediate Expressions' assist in calculating the properties of player seasons. Finally, we employ 'Summarizing Multiple Subsets of a Group Simultaneously' to compute career metrics across different age categories.

Career Epochs (ch_08/career_epochs.pig)
mod_seasons = FILTER bat_seasons BY ((year_id >= 1900) AND (lg_id == 'NL' OR lg_id == 'AL'));

-- Create a season marked with the period of career it was in: young/prime/older
age_seasons = FOREACH mod_seasons {
    young = (age <= 21               ? true : false);
    prime = (age >= 22 AND age <= 29 ? true : false);
    older = (age >= 30               ? true : false);
    OB = H + BB + HBP;
    TB = h1B + 2*h2B + 3*h3B + 4*HR;
    GENERATE
        player_id,
        year_id,
        PA AS PA_all,
				AB AS AB_all,
				OB AS OB_all,
				TB AS TB_all,
        (young ? 1 : 0) AS is_young,
        (young ? PA : 0) AS PA_young, (young ? AB : 0) AS AB_young,
        (young ? OB : 0) AS OB_young, (young ? TB : 0) AS TB_young,
        (prime ? 1 : 0) AS is_prime,
        (prime ? PA : 0) AS PA_prime, (prime ? AB : 0) AS AB_prime,
        (prime ? OB : 0) AS OB_prime, (prime ? TB : 0) AS TB_prime,
        (older ? 1 : 0) AS is_older,
        (older ? PA : 0) AS PA_older, (older ? AB : 0) AS AB_older,
        (older ? OB : 0) AS OB_older, (older ? TB : 0) AS TB_older
    ;
};

-- Calculate metrics by career epoch: young/prime/older
career_epochs = FOREACH (GROUP age_seasons BY player_id) {
    PA_all    = SUM(age_seasons.PA_all  );
    PA_young  = SUM(age_seasons.PA_young);
    PA_prime  = SUM(age_seasons.PA_prime);
    PA_older  = SUM(age_seasons.PA_older);
    -- OBP = (H + BB + HBP) / PA
    OBP_all   = 1.0f*SUM(age_seasons.OB_all)   / PA_all  ;
    OBP_young = 1.0f*SUM(age_seasons.OB_young) / PA_young;
    OBP_prime = 1.0f*SUM(age_seasons.OB_prime) / PA_prime;
    OBP_older = 1.0f*SUM(age_seasons.OB_older) / PA_older;
    -- SLG = TB / AB
    SLG_all   = 1.0f*SUM(age_seasons.TB_all)   / SUM(age_seasons.AB_all);
    SLG_prime = 1.0f*SUM(age_seasons.TB_prime) / SUM(age_seasons.AB_prime);
    SLG_older = 1.0f*SUM(age_seasons.TB_older) / SUM(age_seasons.AB_older);
    SLG_young = 1.0f*SUM(age_seasons.TB_young) / SUM(age_seasons.AB_young);
    --
    GENERATE
        group AS player_id,
        A_all   AS PA_all,
        PA_young AS PA_young,
        PA_prime AS PA_prime,
        PA_older AS PA_older,
        --
        MIN(age_seasons.year_id)  AS beg_year,
        MAX(age_seasons.year_id)  AS end_year,
        --
        OBP_all   + SLG_all       AS OPS_all:float,
        (PA_young >= 700 ? OBP_young + SLG_young : Null) AS OPS_young:float,
        (PA_prime >= 700 ? OBP_prime + SLG_prime : Null) AS OPS_prime:float,
        (PA_older >= 700 ? OBP_older + SLG_older : Null) AS OPS_older:float,
        --
        COUNT_STAR(age_seasons)   AS n_seasons,
        SUM(age_seasons.is_young) AS n_young,
        SUM(age_seasons.is_prime) AS n_prime,
        SUM(age_seasons.is_older) AS n_older
    ;
};

STORE career_epochs INTO 'career_epochs';

We’ll be using this epoch data throughout the chapter to demonstrate different ordering techniques, so don’t delete the data in the career_epochs directory!

Sorting All Records in Total Order

Anyone who has performed a SQL ORDER BY query has prepared a dataset and then sorted it for human consumption. Indeed, creating metrics and then sorting records based on them is at the heart of any data analysis. For this reason, ORDER is a one-line command in Pig:

sorted_records = ORDER records BY field1;

For this analysis, we’re only going to look at players who were able to make solid contributions over several years. We’ll define this as playing for five or more seasons and 2000 or more plate appearances (enough to show statistical significance), and a OPS of 0.650 (an acceptable-but-not-allstar level) or better. This means we must FILTER, and then ORDER and finally to LIMIT to a data size we, as humans (as opposed to Mentats), can read:

Career Epochs (ch_08/order.pig)
career_epochs = FILTER career_epochs BY
    ((PA_all >= 2000) AND (n_seasons >= 5) AND (OPS_all >= 0.650));

career_young = ORDER career_epochs BY OPS_young DESC;
top_10_young = LIMIT career_young 10;

career_prime = ORDER career_epochs BY OPS_prime DESC;
top_10_prime = LIMIT career_prime 10;

career_older = ORDER career_epochs BY OPS_older DESC;
top_10_older = LIMIT career_older 10;

You’ll spot Ted Williams (willite01) as one of the top three young players, top three prime players, and top three old players. Ted Williams was pretty awesome.

Top Young Players
(willite01,9788,1336,3279,5173,1939,1960,1.115402,1.0398661,1.1660492,1.103679,19,2,5,12)
(foxxji01,9676,1302,5306,3068,1925,1945,1.0341599,1.0045433,1.0723403,0.98065215,20,5,8,7)
(troskha01,5749,732,4122,895,1933,1946,0.890712,0.9756794,0.919405,0.6866708,11,2,7,2)

To put all records in a table in order, it’s not sufficient to use the sorting that each reducer applies to its input. If you sorted names from a phonebook, file part-00000 will have names that start with A, then B, up to Z; part-00001 will also have names from A-Z; and so on. The collection has a partial order, but we want the 'total order' that Pig’s ORDER BY operation provides. In a total sort, each record in part-00000 is in order and precedes every records in part-00001; records in part-00001 are in order and precede every record in part-00002; and so forth. For this reason, Pig’s ORDER command is necessary whenever we have more than one reducer.

Sorting by Multiple Fields

Sorting by one field is great, but sometimes our data is a little more complex than that. For instance, we might want to sort by one metric but use another as a tie-breaker. In Pig, sorting on multiple fields is as easy as adding them in order with commas. For instance, to sort by number of older seasons, breaking ties by number of prime seasons:

career_older = ORDER career_epochs
	BY n_older DESC, n_prime DESC;

Wherever reasonable, "stabilize" your sorts by adding enough columns to make the ordering unique. This ensures the output will remain the same from run to run, a best practice for testing and maintainability that we introduced in the 'Map-Only Operations' chapter.

career_older = ORDER career_epochs
  BY n_older DESC, n_prime DESC, player_id ASC; -- makes sure that ties are always broken the same way.

Sorting on an Expression (You Can’t)

Which players have aged the best — made the biggest leap in performance from their prime years to their older years? You might think the following would work, but you cannot use an expression in an ORDER..BY statement:

by_diff_older = ORDER career_epochs BY (OPS_older-OPS_prime) DESC; -- fails!

Instead, generate a new field, sort on it, then project it away. Though it’s cumbersome to type, there’s no significant performance impact.

by_diff_older = FOREACH career_epochs GENERATE
    OPS_older - OPS_prime AS diff,
    player_id..;
by_diff_older = FOREACH (ORDER by_diff_older BY diff DESC, player_id) GENERATE
    player_id..;

If you browse through that table, you’ll get a sense that current-era players seem to be over-represented. This is just a simple whiff of a question, but more nuanced analyses do show an increase in longevity of peak performance. Part of that is due to better training, nutrition, and medical care — and part of that is likely due to systemic abuse of performance-enhancing drugs.

Sorting Case-Insensitive Strings

Pig’s ORDER command will sort capitalized words and lowercase words independently. There’s no intrinsic way to sort case-insensitive; instead, just force a lower-case field to sort on. We don’t have an interesting table with mixed-case records in the baseball dataset, but most UNIX-based computers come with a dictionary in the /usr/share directory tree. Here’s how to sort that ignoring case:

Note: you’ll want to use Pig 'local mode' to run this next command: pig -x local

Case-insensitive Sort
dict        = LOAD '/usr/share/dict/words' AS (word:chararray);
sortable    = FOREACH dict GENERATE LOWER(word) AS l_word, *;
dict_nocase = FOREACH (ORDER sortable BY l_word, word) GENERATE word;
dict_case   = ORDER dict BY word DESC;

Note that we sorted on l_word and word: this stabilizes the sort, ensuring that even though Polish and polish tie in case-insensitivity those ties will always be resolved the same way.

Dealing with Nulls When Sorting

Real data has nulls (missing data), and to create sane, rational and consistent dataflows in Pig requires careful thought about how to handle them. The default behavior of Pig is thus: when the sort field has nulls, Pig sorts them as least-most by default. That is, they will appear as the first rows for DESC order and as the last rows for ASC order. If you want to alter that behavior, you can project a dummy field having the 'favoritism' or artificial sort order you want to impose. Name this column first in your ORDER..BY clause, and you can achieve whatever 'null behavior' you desire. We call this the 'dummy field trick.'

For example, below we sort players' careers with nulls first, and then in another way with nulls last:

Handling Nulls When Sorting (ch_08/order.pig)
nulls_sort_demo = FOREACH career_epochs GENERATE
    (OPS_older IS NULL ? 0 : 1) AS has_older_epoch,
    player_id..;
nulls_then_vals = FOREACH (ORDER nulls_sort_demo BY
    has_older_epoch ASC,
    OPS_all DESC,
    player_id)
    GENERATE
        player_id..;
vals_then_nulls = FOREACH (ORDER nulls_sort_demo BY
    has_older_epoch DESC,
    OPS_all DESC,
    player_id)
    GENERATE
        player_id..;

Floating Values to the Top or Bottom of the Sort Order

Use the 'dummy field trick' any time you want to float records to the top or bottom of the sort order based on a criterion. The example below moves all players whose careers start in 1985 or later to the top, but otherwise sorts on number of older seasons:

Floating Values to the Top of the Sort Order
post1985_vs_earlier = FOREACH career_epochs GENERATE
    (beg_year >= 1985 ? 1 : 0) AS is_1985,
    player_id..;
post1985_vs_earlier = FOREACH (ORDER post1985_vs_earlier BY
    is_1985 DESC,
    n_older DESC,
    player_id)
    GENERATE
        player_id..;

Note that again we add a tie-breaker, player_id, to the sort.

Pattern in Use
  • Standard Snippet  — ORDER tbl BY mykey;

  • Hello, SQL Users

    • Usually this is part of a SELECT statement; in Pig it stands alone

    • You can’t put an expression in the BY clause

  • Important to Know  — Pound-for-pound, unless followed by a LIMIT statement this is one of the most expensive operations you can perform, requiring two to three jobs and a full reduce

  • Output Count  — Unchanged

  • Records  — Unchanged

  • Data Flow  — Map-only on a sample of the data; Map and Reduce to perform the sort. In some cases, if Pig isn’t confident that it will sample correctly, an extra Map-only to perform the map-only/pipelinable operations before the sample

Sorting Records within a Group

Sorting an entire relation is powerful, but more often we want to sort data that has been partitioned by some key - as within a GROUP..BY operation. This operation is straightforward enough and so useful we’ve been applying it all this chapter, but it’s time to be properly introduced and clarify a couple points.

We can sort records within a group using ORDER BY within a nested FOREACH (which we introduced in the Map-only Operations chapter). Rather than sorting all players, lets try sorting the players on each team in a given season. Here’s a snippet to list the top four players for each team-season, in decreasing order by plate appearances:

Sorting Record within a Group (ch_08/bat_seasons.pig)
players_PA = FOREACH bat_seasons GENERATE
    team_id,
    year_id,
    player_id,
    name_first,
    name_last,
    PA;

team_playerslist_by_PA = FOREACH (GROUP players_PA BY (team_id, year_id)) {
    players_o_1 = ORDER players_PA BY PA DESC, player_id;
    players_o = LIMIT players_o_1 4;
    GENERATE
        group.team_id,
        group.year_id,
        players_o.(player_id, name_first, name_last, PA) AS players_o;
};

Ordering a group in the nested block immediately following a structural operation does not require extra operations, since Pig is able to simply specify those fields as secondary sort keys. Basically, as long as it happens first in the reduce operation it’s free (though if you’re nervous, look for the line "Secondary sort: true" in the output of EXPLAIN). Messing with a bag before the ORDER..BY causes Pig to instead sort it in-memory using quicksort, but will not cause another map-reduce job. That’s good news unless some bags are so huge they challenge available RAM or CPU, which won’t be subtle.

If you depend on having a certain sorting, specify it explicitly, even when you notice that a GROUP..BY or some other operation seems to leave it in that desired order. It gives a valuable signal to anyone reading your code, and a necessary defense against some future optimization deranging that order [1]

Once sorted, the bag’s order is preserved by projections, by most functions that iterate over a bag, and by the nested pipeline operations FILTER, FOREACH, and LIMIT. The return values of nested structural operations CROSS, ORDER BY and DISTINCT do not follow the same order as their input; neither do structural functions such as CountEach (in-bag histogram) or the set operations (REF) described at the end of the chapter. (Note that though their outputs are dis-arranged these of course don’t mess with the order of their inputs: everything in Pig is immutable once created.)

team_playerslist_by_PA_2 = FOREACH team_playerslist_by_PA {
    -- will not have same order, even though contents will be identical
    disordered    = DISTINCT players_o;
    -- this ORDER BY does _not_ come for free, though it's not terribly costly
    alt_order     = ORDER players_o BY player_id;
    -- these are all iterative and so will share the same order of descending PA
    still_ordered = FILTER players_o BY PA > 10;
    pa_only       = players_o.PA;
    pretty        = FOREACH players_o GENERATE
        StringConcat((chararray)PA, ':', name_first, ' ', name_last);
    GENERATE
        team_id,
        year_id,
        disordered,
        alt_order,
        still_ordered,
        pa_only,
        BagToString(pretty, '|');
};
Pattern in Use
  • Where You’ll Use It  — Extracting top records from a group (see next). Preceding many UDFs that depend on ordering. To make your output readable. To stabilize results.

  • Hello, SQL Users  — This is not directly analogous to the ORDER BY part of a SELECT statement, as it is done to the inner bag. For users of Oracle and other databases, this is similar to a sort within a windowed query.

  • Important to Know  — If it can be applied to the records coming from the mapper, it’s free. Verify by looking for Secondary sort: true in the output of EXPLAIN

  • Output Count  — Unchanged

  • Records  — Unchanged

Select Rows with the Top-K Values for a Field

On its own, LIMIT will return the first records it finds. What if you want to rank the records — sort by some criteria — so you don’t just return the first ones, but the top ones?

Use the ORDER operator before a LIMIT to guarantee this "top K" ordering. This technique also applies a clever optimization (reservoir sampling, see TODO ref) that sharply limits the amount of data sent to the reducers.

Let’s say you wanted to select the top 20 seasons by number of hits:

Top 20 Player Seasons by Hits (ch_08/bat_seasons.pig)
sorted_seasons = ORDER (FILTER bat_seasons BY PA > 60 AND year_id > 1900) BY H DESC;
top_20_seasons = LIMIT sorted_seasons 20;

In SQL, this would be:

SELECT * FROM bat_season WHERE PA > 60 AND year_id > 1900 ORDER BY H DESC LIMIT 20;

There are two useful optimizations to make when the number of records you will keep (K) is much smaller than the number of records in the table (N). The first one, which Pig does for you, is to only retain the top K records at each Mapper; this is a great demonstration of where a Combiner is useful: After each intermediate merge/sort on the Map side and the Reduce side, the Combiner discards all but the top K records.

Top K Within a Group

Pig’s 'top' function accepts a bag and returns a bag with its top K elements.

Top 5 players per season by RBIs (ch_08/bat_seasons.pig)
top_5_per_season = FOREACH (GROUP bat_seasons BY year_id) GENERATE
    group AS year_id,
    TOP(5,19,bat_seasons); -- 19th column is RBIs (start at 0)

You could achieve the same output with the more cumbersome:

top_5_per_season = FOREACH (GROUP bat_seasons BY year_id) {
    sorted = ORDER bat_seasons BY RBI DESC;
    top_5 = LIMIT sorted 5;
    ascending = ORDER top_5 BY RBI;
    GENERATE
        group AS year_id,
        ascending AS top_5;
};

Numbering Records in Rank Order

The RANK command prepends a ranked label for each record in a relation. You can RANK an entire record, or one of more fields in a record.

ranked_seasons = RANK bat_seasons;
ranked_rbi_seasons = RANK bat_seasons BY
    RBI DESC,
    H DESC,
    player_id;
ranked_hit_dense = RANK bat_seasons BY
    H DESC DENSE;

If you supply only the name of the table, RANK acts as a pipeline operation, introducing no extra map/reduce stage. Each split is numbered as a unit: the third line of chunk part-00000 gets rank 2, the third line of chunk part-00001 gets rank 2, and so on.

It’s important to know that in current versions of Pig, the RANK operator sets parallelism one, forcing all data to a single reducer.

gift_id gift      RANK   RANK gift_id  RANK gift DENSE
1  partridge         1       1            1
4a calling birds     2       4            7
4b calling birds     3       4            7
2a turtle dove       4       2            2
4d calling birds     5       4            7
5  golden rings      6       5           11
2b turtle dove       7       2            2
3a french hen        8       3            4
3b french hen        9       3            4
3c french hen       10       3            4
4c calling birds    11       4            7

Finding Records Associated with Maximum Values

Sometimes we want to find the record with the maximum value and preserve it. In Pig, we can do this with a nested ORDER BY/LIMIT inside a FOREACH. For example, for each player, find their best season by RBI:

Finding the Record with the Max Value (ch_08/bat_seasons.pig)
-- For each season by a player, select the team they played the most games for.
-- In SQL, this is fairly clumsy (involving a self-join and then elimination of
-- ties) In Pig, we can ORDER BY within a foreach and then pluck the first
-- element of the bag.

top_stint_per_player_year = FOREACH (GROUP bat_seasons BY (player_id, year_id)) {
    sorted = ORDER bat_seasons BY RBI DESC;
    top_stint = LIMIT sorted 1;
		stints = COUNT_STAR(bat_seasons);
    GENERATE
        group.player_id,
        group.year_id,
				stints,
        FLATTEN(top_stint.(team_id, RBI)) AS (team_id, RBI);
};

DUMP @;

It turns out this dataset has no stints, only the most significant stint is listed in the bat_seasons data.

Shuffle a Set of Records

One common use of Hadoop is to run simulations at scale. When doing this, it is often handy to prepare multiple unique sorts of a single dataset. In other words, multiple 'shuffles' of the same data. To shuffle a set of records, we’re going to apply the Assign a unique ID pattern to generate an arbitrary key (one that is decoupled from the records' content), and then use that to order the records.

DEFINE Hasher datafu.pig.hash.Hasher('sip24', 'rand');

people_hashed = FOREACH people GENERATE Hasher(player_id) AS hash, *;

people_ranked = RANK people_hashed;

-- Back to the original records by skipping the first, hash field
people_shuffled = FOREACH people_ranked GENERATE $2..;

STORE people_shuffled INTO 'people_shuffled/1/';

You can run this script multiple times with different output paths to get different shuffles of the same data.

We use a randomized hash function UDF for each record; then number each line within the split. The important difference here is that the hash function we generated accepts a seed that we can mix in to each record. If you supply a constant to the constructor (see the documentation) then the records will be put into an effectively random order, but the same random order each time. By supplying the string 'rand' as the argument, the UDF will use a different seed on each run. What’s nice about this approach is that although the ordering is different from run to run, it does not exhibit the anti-pattern of changing from task attempt to task attempt. The seed is generated once and then used everywhere. Rather than creating a new random number for each row, you use the hash to define an effectively random ordering, and the seed to choose which random ordering to apply.

Wrapping Up

In this chapter, we’ve learned to sort, rank and order data. We learned how to sort entire relations by one or more fields. We learned how to prioritize certain records when sorting. We learned how to deal with sorting nulls and mixed-case strings. We showed how to sort within a group with TOP and a nested ORDER BY. Finally, we learned how to shuffle, or sort randomly using a Hash.

Our bag-of-tricks is getting larger and larger. Soon there will be no data-processing problem you face that you can’t come up with a solution for using the patterns in this book. Your applied knowledge of Pig and Hadoop will constitute a working knowledge of analytics in general, and you’ll be able to arbitrarily process data at scale and implement algorithms on big data. When you become as comfortable processing big data as you are small, there are boundless opportunities to work with the ever increasing onslaught of new, big data to create new insights, build new products and make better decisions.

In the next chapter, we’ll learn about creating unique values and relations, and working with sets. This will complete our analytic toolkit.


1. That’s not too hypothetical: there are cases where you could more efficiently group by binning the items directly in a Map rather than sorting