Skip to content

8ma10s/CS143_Project2

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

29 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

CS 143 Spark Project : Handling User-defined Functions With Hybrid Hash Aggregation

User-defined functions (UDFs) allow developers to define and exploit custom operations within expressions. For instance, say that you have a product catalog that includes photos of the product packaging. You may want to register a user-defined function extract_text that calls an OCR algorithm and returns the text in an image, so that you can get 'queryable' information out of the photos. You can do this in SQL easily. In SQL, you could imagine a query like this:

SELECT P.name, P.manufacturer, P.price, extract_text(P.image), 
  FROM Products P;

The ability to register UDFs is very powerful -- it extends the ability of your data processing framework. Now the question is, what are the difficulties that one might face when we implement UDFs in distributed environments? For this, lets look at Apache Spark. Apache Spark is a leading framework for distributed computing in the mold of Map-Reduce. Spark SQL, built on top of Spark is one of its popular components. We will briefly touch upon and learn about Spark during Friday's discussion sections. As it turns out, you can implement and register UDFs in Spark SQL as well. But UDFs can often introduce performance bottlenecks, especially as we run them over millions of data items.

You will be completing this assignment in parts. In the first part, you'll implement the caching mechanism of a user-defined function. In the second part, you'll implement the hash-based aggregation mechanism.

Spark

Spark is an open-source distributed computing system written in Scala. The project was started by Ph.D. students from the AMPLab and is an integral part of the Berkeley Data Analytics Stack.

Like Hadoop MapReduce, Spark is designed to run functions over large collections of data, by supporting a simplified set of high-level data processing operations akin to the iterators we've been learning about in class. One of the most common uses of such systems is to implement parallel query processing in high level languages such as SQL. In fact, many recent research and development efforts in Spark have gone towards supporting a scalable and interactive relational database abstraction.

We'll be using, modifying, and studying aspects of Spark in this class to understand key concepts of modern data systems. More importantly you will see that the ideas we're covering in class -- some of which are decades old -- are still very relevant today. Specifically, we will be adding features to Spark SQL.

One key limitation of Spark SQL is that it is currently a main-memory-only system. As part of this class, we will extend it to include some out-of-core algorithms as well.

Scala

Scala is a statically-typed language that supports many different programming paradigms. Its flexibility, power, and portability have become especially useful in distributed-systems research.

Scala resembles Java, but it possesses a much broader set of syntax features to facilitate multiple paradigms. Knowing Java will help you understand some Scala code, but not much of it, and not knowing Scala will prevent you from fully taking advantage of its expressive power. Because you must write code in Scala, we strongly recommend you to acquire at least a passing familiarity with the language.

IntelliJ IDEA tends to be the most commonly used IDE for developing in Spark. IntelliJ is a Java IDE that has a Scala (and vim!) plugin. There are also other options such as Scala-IDE.

You might find the following tutorials to be useful:

Additional Reading for the Interested

Setup

git and GitHub

git is a version control system, helping you track different versions of your code, synchronize them across different machines, and collaborate with others. GitHub is a site which supports this system, hosting it as a service.

If you don't know much about git, we strongly recommend you to familiarize yourself with this system; you'll be spending a lot of time with it! There are many guides to using git online - here is a great one to read.

Setting up your repository and pulling the framework

You should first set up a remote private repository (e.g., spark-homework). Github gives private repository to students (but this may take some time). If you don't have a private repository, think TWICE about checking it in public repository, as it will be available for others to checheckout.

$ cd ~

Clone your personal repository. It should be empty.

$ git clone "https://github.com/xx/yy.git"

Enter the cloned repository, track the course repository and clone it.

$ cd yy/
$ git remote add course "https://github.com/mitshubh/cs143-spark.git"
$ git pull course master

NOTE: Please do not be overwhelmed by the amount of code that is here. Spark is a big project with a lot of features. The code that we will be touching will be contained within one specific directory: sql/core/src/main/scala/org/apache/spark/sql/execution/. The tests will all be contained in sql/core/src/test/scala/org/apache/spark/sql/execution/

Push clone to your personal repository.

$ git push origin master

Every time that you add some code, you can commit the modifications to the remote repository.

$ git commit -m 'update to homework'
$ git push origin master

Receiving assignment updates

It may be necessary to receive updates to our assignment (even though we try to release them as "perfectly" as possible the first time). Assuming you set up the tracking correctly, you can simply run this following command to receive assignment updates:

$ git pull course master

Searching files in UNIX

The following UNIX command will come in handy, when you need to find the location of a file. Example- Find location of a file named 'DiskHashedRelation.scala' in my current repository.

$ find ./ -name 'DiskHashedRelation.scala' 

Building Spark

Once you have the pulled the code, cd into {repo root} and run make compile. The first time you run this command, it should take a while -- sbt will download all the dependencies and compile all the code in Spark (there's quite a bit of code). Once the initial assembly commands finish, you can start your project! (Future builds should not take this long -- sbt is smart enough to only recompile the changed files, unless you run make clean, which will remove all compiled class files.)

Teams

The assignment due date is published on CCLE.

Team formation rules are posted at class website

PART A

To complete this part, you will need to develop a user-defined function (UDF) that provides the following functionality in an efficient and reliable manner: for the two situations when (i) all the values can fit in memory and (ii) when they do not fit in memory. As you can guess, hashing will be involved. More precisely, you will work in this project to achieve the following goals:

Assignment Goals

  1. Implement disk hash-partitioning
  2. Implement in-memory UDF caching
  3. Implement hash-partitioned UDF caching

This project will illustrate key concepts in data rendezvous and query evaluation, and you'll get some hands-on experience modifying Spark, which is widely used in the field. In addition, you'll get exposure to Scala, a JVM-based language that is gaining popularity for its clean functional style.

Lastly, there is a lot of code in this directory. DO NOT GET OVERWHELMED!! Please look here here to find the directory where the code is located.

Background and Framework

UDFs (User Defined Functions)

SELECT P.name, P.manufacturer, P.price, extract_text(P.image), 
  FROM Products P;

Let's look at the above example, which we used earlier. If the input column(s) to a UDF contain many duplicate values, it may be beneficial to improve performance by ensuring that the UDF is only called once per distinct input value, rather than once per row. (For example in our Products example above, all the different configurations of a particular PC might have the same image.) In this assignment, we will implement this optimization. We'll take it in stages -- first get it working for data that fits in memory, and then later for larger sets that require an out-of-core approach. We will use external hashing as the technique to "rendezvous" all the rows with the same input values for the UDF.

  1. Implement disk-based hash partitioning.
  2. Implement in-memory UDF caching.
  3. Combine the above two techniques to implement out-of-core UDF caching.

If you're interested in the topic, the following paper will be an interesting read (optional).

Project Framework

All the code you will be touching will be in three files -- CS143Utils.scala, basicOperators.scala, and DiskHashedRelation.scala. You might however need to consult other files within Spark or the general Scala APIs in order to complete the assignment thoroughly. Please make sure you look through all the provided code in the three files mentioned above before beginning to write your own code. There are a lot of useful functions in CS143Utils.scala as well as in DiskHashedRelation.scala that will save you a lot of time and cursing -- take advantage of them!

In general, we have defined most (if not all) of the methods that you will need. As before, in this project, you need to fill in the skeleton. The amount of code you will write is not very high -- the total staff solution is less than a 100 lines of code (not including tests). However, stringing together the right components in a memory-efficient way (i.e., not reading the whole relation into memory at once) will require some thought and careful planning.

Some Terminology Differences

There are some potentially confusing differences between the common terminology, and the terminology used in the SparkSQL code base:

  • The "iterator" concept we normally use is called a "node" in the SparkSQL code -- there are definitions in the code for UnaryNode and BinaryNode. A query plan is called a SparkPlan, and in fact UnaryNode and BinaryNode extend SparkPlan (after all, a single iterator is a small query plan!) You may want to find the file SparkPlan.scala in the SparkSQL source to see the API for these nodes.

  • In some of the comments in SparkSQL, they also use the term "operator" to mean "node". The file basicOperators.scala defines a number of specific nodes (e.g. Sort, Distinct, etc.).

  • Don't confuse the Scala interface Iterator with the iterator concept used otherwise. The Iterator that you will be using in this project is a Scala language feature that you will use to implement your SparkSQL nodes. Iterator provides an interface to Scala collections that enforces a specific API: the next and hasNext functions.

Your Task

Disk hash-partitioning

We have provided you skeleton code for DiskHashedRelation.scala. This file has 4 important things:

  • trait DiskHashedRelation defines the DiskHashedRelation interface
  • class GeneralDiskHashedRelation is our implementation of the DiskedHashedRelation trait
  • class DiskPartition represents a single partition on disk
  • object DiskHashedRelation can be thought of as an object factory that constructs GeneralDiskHashedRelations

Task #1: Implementing DiskPartition and GeneralDiskHashedRelation

First, you will need to implement the insert, closeInput, and getData methods in DiskPartition for this part. For the former two, the docstrings should provide a comprehensive description of what you must implement. The caveat with getData is that you cannot read the whole partition into memory in once. The reason we are enforcing this restriction is that there is no good way to enforce freeing memory in the JVM, and as you transform data to different forms, there would be multiple copies lying around. As such, having multiple copies of a whole partition would cause things to be spilled to disk and would make us all sad. Instead, you should stream one block into memory at a time.

At this point, you should be passing the tests in DiskPartitionSuite.scala.

Task #2: Implementing object DiskHashedRelation

Your task in this portion will be to implement phase 1 of external hashing -- using a coarse-grained hash function to stream an input into multiple partition relations on disk. For our purposes, the hashCode method that every object has is sufficient for generating a hash value, and taking the modulo by the number of the partitions is an acceptable hash function.

At this point, you should be passing all the tests in DiskHashedRelationSuite.scala.

In-Memory UDF Caching

In this section, we will be dealing with case class CacheProject in basicOperators.scala. You might notice that there are only 4 lines of code in this class and, more importantly, no /* IMPLEMENT THIS METHOD */s. You don't actually have to write any code here. However, if you trace the function call in line 66, you will find that there are two parts of this stack you must implement in order to have a functional in-memory UDF implementation.

Task #3: Implementing CS143Utils methods

For this task, you will need to implement getUdfFromExpressions and the Iterator methods in CachingIteratorGenerator#apply. Please read the docstrings -- especially for apply -- closely before getting started.

After implementing these methods, you should be passing the tests in CS143UtilsSuite.scala.

Hint: Think carefully about why these methods might be a part of the Utils

Disk-Partitioned UDF Caching

Now comes the moment of truth! We've implemented disk-based hash partitioning, and we've implemented in-memory UDF caching -- what is sometimes called memoization. Memoization is very powerful tool in many contexts, but here in databases-land, we deal with larger amounts of data than memoization can handle. If we have more unique values than can fit in an in-memory cache, our performance will rapidly degrade. Thus, we fall back to the time-honored databases tradition of divide-and-conquer. If our data does not fit in memory, then we can partition it to disk once, read one partition in at a time (think about why this works (hint: rendezvous!)), and perform UDF caching, evaluating one partition at a time.

Task 4: Implementing PartitionProject

This final task requires that you fill in the implementation of PartitionProject in basicOperators.scala. All the code that you will need to write is in the generateIterator method. Think carefully about how you need to organize your implementation. You should not be buffering all the data in memory or anything similar to that.

At this point, you should be passing all given tests.

Testing

We have provided you some sample tests in DiskPartitionSuite.scala, DiskHasedRelationSuite.scala, CS143UtilsSuite.scala and ProjectSuite.scala for Part A and in InMemoryAggregateSuite, SpillableAggregationSuite and RecursiveAggregationSuite for Part B. These tests can guide you as you complete this project. However, keep in mind that they are not comprehensive, and you are well advised to write your own tests to catch bugs. Hopefully, you can use these tests as models to generate your own tests.

In order to run our tests, we have provided a simple Makefile. In order to run the tests for task 1, run make t1. Correspondingly for task, run make t2, and the same for all other tests. make partA will run all the tests for Part A and make all will run all the tests.

Part B

Assignment Goals

  1. Implement hash-base aggregation
  2. Implement spillable data strucuture
  3. Combine the above two tecniques to implement hybrid hashing aggregation

Project Framework

All the code you will be touching will be in two files -- CS143Utils.scala and SpillableAggregate.scala. You might however need to consult other files within Spark (especially Aggregate.scala) or the general Scala APIs in order to complete the assignment thoroughly.

In general, we have defined most (if not all) of the methods that you will need. As before, in this project, you need to fill in the skeleton. The amount of code you will write is not very high -- the total staff solution is less than a 100 lines of code (not including tests). However, stringing together the right components in an efficient way will require some thought and careful planning.

Your Task

In memory hash-based aggregation

We have provided you skeleton code for SpillableAggregate.scala. This file has 4 important things:

  • aggregator extracts the physical aggregation from the logical plan;
  • aggregatorSchema contains the output schema for the aggregate at hand
  • the method newAggregatorInstance creates the actual instance of the physical aggregator that will be used during execution
  • generateIterator is the main method driving the computation of the aggregate.

Task #5: Implementing the in-memory part of SpillableAggregate

First, you will need to implement the aggregator, aggregatorSchema, and newAggregatorInstance methods in SpillableAggregate.scala for this part. This is a simple exercise, just check Aggregate.scala. Try however to understand the logic because you will need those methods to implement generateIterator. In order to complete the implementation of generateIterator at this point, you will need to fill the hasNext, next, aggregate and the object AggregateIteratorGenerator in CS143Utils. The logic of generateIterator is the following: 1) drain the input iterator into the aggregation table; 2) generate an aggregate iterator using the helper function AggregateIteratorGenerator properly formatting the aggregate result; and 3) use the Iterator inside generateIterator as external interface to access and drive the aggregate iterator.

No need to spill to disk at this point.

Hybrid hash aggregation

Task #6: Make the aggregate table spill to disk

Your task is first to implement the maybeSpill method in CS143Utils. The next step is to revise your implementation of generateIterator in SpillableAggregate.scala by making the current aggregation table check if it can safely add a new record without triggering the spilling to disk. If the record cannot be added to the aggregation table, it will be spilled to disk. To implement the above logic, you will have to fill up the methods initSpills and spillRecord, and properly modify your implementation of aggregate. In initSpills remember to set blockSize to 0, otherwise spilled records will stay in memory and not actually spill to disk!

Task #7: Recursive Aggregation

At this point the only missing part for having an hybrid-hash aggregate is to recursively aggregate records previously spilled to disk. If you have implemented the previous task correctly, to finish this task you will only have to take care of the situation in which 1) the input iterator of generateIterator is drained; 2) aggregate table contains aggregate values for a subset of the groups; and 3) the remaining groups sit on disk in files properly partitioned. The idea now is to clear the current aggregation table and fetch the spilled records partition by partition and aggregate them in memory. Implement fetchSpill and revise your implementation of hasNext and next in generateIterator. You can assume that the aggregate table for each partition fits in memory (what would you do if instead we remove this assumption?).

Assignment submission

Submission link will be created on CCLE, where you can submit your code by the due date. Please commit a file called project-team.txt, which contains the names and UIDs of the team members. Submit only the zipped repository.

Acknowledgements

Big thanks to Matteo Interlandi.

Good luck!