(Edit 10/8/2015 : A lot has changed in the last few months – you may want to check out my new post on Spark, Parquet & S3 which details some of the changes)
One of the projects we’re currently running in my group (Amdocs’ Technology Research) is an evaluation the current state of different option for reporting on top of and near Hadoop (I hope I’ll be able to publish the results when we’d have them). Anyway, part of the preparations for the benchmark includes ingesting a lot of events (CDRs) into the system and creating different aggregations on top of them for instance, for voice call billing events we create yearly, monthly, weekly and daily and hourly aggregations on the subscriber level which include measures like : count of calls, average duration, sum of pricing, median balance, hourly distribution of calls, popular destinations etc.
We are using spark to do the ingestion and I thought that there are two interesting aspects I can share, which I haven’t seen too many examples on the internet, namely:
- doing multiple aggregations i.e. an aggregations that goes beyond a work-count level
- Writing to an Hadoop output format (Parquet in the example)
I created a minimal example, which uses a simple, synthesized input and demonstrates these two issues – you can get the complete code for that on github . The rest of this post will highlight some of the points from the example.
Let’s start with the main core spark code, which is simple enough:
- line 1 – is reading a CSV as text file
- line 3 is doing a simple parsing of the file and replacing it with a class. The parsed RDDs are cached since we’d iterate them multiple times (for each aggregation)
- like 5,6 groups by multiple keys. The nice way to do that is using SparkSQL but the version I used (1.0.2) had very limited and undocumented SQL (I had to go to the code). This should be better in future versions. If you still use “regular” spark, I haven’t found a better way to group on multiple fields except creating the complex key by hand which is what getHourly and getWeekly methods do (create a string with year,month… etc dimensions)
- like 11,12 do the aggregations themselves (which we’d look into next). The output is again Pair RDDs. This seems useless however, it turns out you need pair RDDs if you want to save to Hadoop formats (more on that later)
Aggregations
Once we do the group by key (lines 5,6 above) we have a collection of Call records (Iterable[Call]). Now scala has a lot of nifty functions to transform Iterables (map, flatmap, foldleft etc. etc.) however we have a long list of aggregations to compute and the the Calls collection can get rather big (e.g. for yearly aggregates) and we have lots and lots of collections to iterate (terabytes and terabytes). So instead of using all these features I opted for the more traditional Java like aggregation, which while being pretty ugly, minimized the number of iterations I am making on the data.
You can see the result below (if you have a nicer looking, yet efficient, idea, I’d be happy to hear about it)
Save to Parquet
The end result of doing the aggregations is an hierarchical structure – lise of simple measures (avgs, sums, counts etc.) but also a phone book, which also has an array of pricings and an hours breakdown which is also an array. I decided to store that in Parquet/ORC formats which are efficient for queries in Hadoop (by Hive/Impala depending on the Hadoop distribution you are using). For the purpose of the example I included the code to persist to parquet.
If you can use SparkSQL than support for Parquet is built in and you can do something as simple as
You might have notices that the weeklyAggregated is a simple RDD and not a pair RDD since it isn’t needed here. Unfortunately for me I had to use Spark 0.9 (our lab is currently on Cloudera 5.0.1) – also in the more general case of writing to other Hadoop file formats you can’t use this trick.
One point specific to Parquet is that you can’t write to it directly – you have to use a “writer” class and parquet has Avro, Thrift and ProtoBuf writers available. I thought that’s going to be nice and easy so I looked for scala library to serialize to one of these formats and chose Scalavro (which I used in the past) turns out that, while the serialization is Avro compatible it is not the standard Avro class the Avro writer for parquet expects. No problem, I thought I’ll just take another library that creates works with Thrift – Twitter has one, it is called Scrooge works nice and all – but again it is not completely standard (doesn’t inherit from TBase). Oh well, maybe protobuf will do the job so I tried ScalaBuff , alas, this didn’t work well either (inheriting from LightMessage and not Message as expected by the writer) – I ended up using the plain Java generator for protobuf. This further uglified the aggregation logic but at least it got he job done. The output from the aggregation is a pair RDD of protobuf serializable aggregations
So, why the the pair RDD? it turns out all the interesting save functions that can use Hadoop file format only exist on pair RDD and not on regular ones. If you know this little trick the code is not very complicated:
The first two lines in the snippet above configure the writer and are specific to parquet. the last line is the one that does the actual save to file – it specified the output directory, the key class (Void since we don’t need this with the parquet format), the for the records, the Hadoop output format class (Parquet in our case) and lastly a job configuration
To summarize. When you want to do more that a simple toy example the code may end up more complicated than the concise examples you see on-line. When working with lots and lots of data the number of iterations you make on the data also begin to be important and you need to pay attention to that.
Spark is a very powerful library for working on big data, it has a lot of components and capabilities. However its biggest weakness (in my opinion anyway) is its documentation. It makes it easy to start work with the platform, but when you want to do something a little more interesting you are left to dig around without proper directions. I hope this post will help some others when they’d do their digging around :)
Lastly,again I remind you, that you can get the complete code for that on github
Great post. I agree with your assesment that Spark documentation is missing more complex examples. I think that the people of Databricks have also realized about this and have tried to partially remedy it with this blog post:
http://databricks.com/blog/2014/09/23/databricks-reference-applications.html
[…] note,Spark and Parquet integration has come a long way in the past few months. Previously, one had to jump through hoops just to be able to convert existing data to Parquet. The introduction of DataFrames to Spark made this process much, much simpler. When the input […]
Some minor typos
After
1st Code Listing
You mention
like 5,6 groups by multiple keys.
Should be
line 8,9 groups by Multiple Keys
Line 5 and 6 just create the PairRDDs
In later portions of the article too – line 5,6 have been mentioned for GroupBy – while GroupBy happens on line 8,9