Getting started with Spark (now shipping inside CDH 5) is easy using this simple example.
Apache Spark is a general-purpose, cluster computing framework that, like MapReduce in Apache Hadoop, offers powerful abstractions for processing large datasets. For various reasons pertaining to performance, functionality, and APIs, Spark is already becoming more popular than MapReduce for certain types of workloads. (For more background about Spark, read this post.)
In this how-to, you’ll learn how to write, compile, and run a simple Spark program written in Scala on CDH 5 (in which Spark ships and is supported by Cloudera). The full code for the example is hosted at https://github.com/sryza/simplesparkapp.
Writing
Our example app will be a souped-up version of WordCount, the classic MapReduce example. In WordCount, the goal is to learn the distribution of letters in the most popular words in our corpus. That is, we want to:
Read an input set of text documents
Count the number of times each word appears
Filter out all words that show up less than a million times
For the remaining set, count the number of times each letter occurs
In MapReduce, this would require two MapReduce jobs, as well as persisting the intermediate data to HDFS in between them. In constrast, in Spark, you can write a single job in about 90 percent fewer lines of code.
Our input is a huge text file where each line contains all the words in a document, stripped of punctuation. The full Scala program looks like this:
Spark uses “lazy evaluation”, meaning that transformations don’t execute on the cluster until an “action” operation is invoked. Examples of action operations are collect, which pulls data to the client, and saveAsTextFile, which writes data to a filesystem like HDFS.
It’s worth noting that in Spark, the definition of “reduce” is slightly different than that in MapReduce. In MapReduce, a reduce function call accepts all the records corresponding to a given key. In Spark, the function passed to reduce, or reduceByKey function call, accepts just two arguments – so if it’s not associative, bad things will happen. A positive consequence is that Spark knows it can always apply a combiner. Based on that definition, the Spark equivalent of MapReduce’s reduce is similar to a groupBy followed by a map.
For those more comfortable with Java, here’s the same program using Spark’s Java API:
Because Java doesn’t support anonymous functions, the program is considerably more verbose, but it still requires a fraction of the code needed in an equivalent MapReduce program.
Compiling
We’ll use Maven to compile our program. Maven expects a specific directory layout that informs it where to look for source files. Our Scala code goes under src/main/scala, and our Java code goes under src/main/java. That is, we place SparkWordCount.scala in the src/main/scala/com/cloudera/sparkwordcount directory and JavaWordCount.java in the src/main/java/com/cloudera/sparkwordcount directory.
Maven also requires you to place a pom.xml file in the root of the project directory that tells it how to build the project. A few noteworthy excerpts are included below.
To compile Scala code, include:
which requires adding the scala-tools plugin repository:
Then, include Spark and Scala as dependencies:
Finally, to generate our app jar, simply run:
It will show up in the target directory as sparkwordcount-0.0.1-SNAPSHOT.jar.
Running
Running your Spark application is like running any Java program: You include the application jar and its dependencies in the classpath and pass apps the main class. In a CDH installation, the Spark and Hadoop jars are included on every node. Some will recommend packaging these dependencies inside your Spark application jar itself, but Cloudera recommends referencing the locally installed jars. Doing so ensures that the client uses the same versions of these jars as the server, and means you don’t need to recompile apps when you upgrade the cluster.
The following includes local Hadoop and Spark jars in the classpath and then runs the application. Before running, place the input file into a directory on HDFS. The repository supplies an example input file in its “data” directory.
Spark’s trunk contains a script called spark-submit that abstracts away the pain of building the classpath. Its inclusion in an upcoming release will make launching an application much easier.
-Dspark.master specifies the cluster against which to run the application; local will run all tasks in the same local process. To run against a Spark standalone cluster instead, include a URL containing the master’s address (such as spark://masterhost:7077). To run against a YARN cluster, include yarn-client — Spark will determine the YARN ResourceManager’s address from the YARN configuration file.
The output of the program should look something like this:
Congratulations, you have just run a simple Spark application in CDH 5. Happy Sparking!
Sandy Ryza is an engineer on the data science team at Cloudera. He is an Apache Hadoop committer and recently led Cloudera’s Spark development.
Spark Summit 2014 is coming (June 30 – July 2)! Register here to get 20% off the regular conference price.