(A version of this post was originally posted in AppsFlyer’s blog. Also special thanks to Morri Feldman and Michael Spector from AppsFlyer data team that did most of the work solving the problems discussed in this article)
TL;DR; The combination of Spark, Parquet and S3 (& Mesos) is a powerful, flexible and cost effective analytics platform (and, incidentally, an alternative to Hadoop). However making all these technologies gel and play nicely together is not a simple task. This post describes the challenges we (AppsFlyer) faced when building our analytics platform on these technologies, and the steps we took to mitigate them and make it all work.
Spark is shaping up as the leading alternative to Map/Reduce for several reasons including the wide adoption by the different Hadoop distributions, combining both batch and streaming on a single platform and a growing library of machine-learning integration (both in terms of included algorithms and the integration with machine learning languages namely R and Python). At AppsFlyer, we’ve been using Spark for a while now as the main framework for ETL (Extract, Transform & Load) and analytics. A recent example is the new version of our retention report that we recently released, which utilized Spark to crunch several data streams (> 1TB a day) with ETL (mainly data cleansing) and analytics (a stepping stone towards full click-fraud detection) to produce the report.
One of the main changes we introduced in this report is the move from building onSequence files to using Parquet files. Parquet is a columnar data format, which is probably the best option today for storing long term big data for analytics purposes (unless you are heavily invested in Hive, where Orc is the more suitable format). The advantages of Parquet vs. Sequence files are performance and compression without losing the benefit of wide support by big-data tools (Spark, Hive, Drill, Tajo, Presto etc.).
One relatively unique aspect of our infrastructure for big data is that we do not use Hadoop (perhaps that’s a topic for a separate post). We are using Mesos as a resource manager instead of YARN and we use Amazon S3 instead of HDFS as a distributed storage solution. HDFS has several advantages over S3, however, the cost/benefit for maintaining long running HDFS clusters on AWS vs. using S3 are overwhelming in favor of S3.
That said, the combination of Spark, Parquet and S3 posed several challenges for us and this post will list the major ones and the solutions we came up with to cope with them.
Parquet & Spark
Parquet and Spark seem to have been in a love-hate relationship for a while now. On the one hand, the Spark documentation touts Parquet as one of the best formats for analytics of big data (it is) and on the other hand the support for Parquet in Spark is incomplete and annoying to use. Things are surely moving in the right direction but there are still a few quirks and pitfalls to watch out for.
To start on a positive note,Spark and Parquet integration has come a long way in the past few months. Previously, one had to jump through hoops just to be able to convert existing data to Parquet. The introduction of DataFrames to Spark made this process much, much simpler. When the input format is supported by the DataFrame API e.g. the input is JSON (built-in) or Avro (which isn’t built in Spark yet, but you can use a library to read it) converting to Parquet is just a matter of reading the input format on one side and persisting it as Parquet on the other. Consider for example the following snippet in Scala:
Even when you are handling a format where the schema isn’t part of the data, the conversion process is quite simple as Spark lets you specify the schema programmatically. The Spark documentation is pretty straightforward and contains examples in Scala, Java and Python. Furthermore, it isn’t too complicated to define schemas in other languages. For instance, here (AppsFlyer), we use Clojure as our main development language so we developed a couple of helper functions to do just that. The sample code below provides the details:
The first thing is to extract the data from whatever structure we have and specify the schema we like. The code below takes an event-record and extracts various data points from it into a vector of the form [:column_name value optional_data_type]. Note that the data type is optional since it is defaults to string if not specified.
The next step is to use the above mentioned structure to both extract the schema and convert to DataFrame Rows:
Finally we apply these functions over an RDD , convert it to a data frame and save as parquet:
As mentioned above, things are on the up and up for Parquet and Spark but the road is not clear yet. Some of the problems we encountered include:
- a critical bug in 1.4 release where a race condition when writing parquet files caused massive data loss on jobs (This bug is fixed in 1.4.1 – so if you are using Spark 1.4 and parquet upgrade yesterday!)
- Filter pushdown optimization, which is turned off by default since Spark still uses Parquet 1.6.0rc3 -even though 1.6.0 has been out for awhile (it seems Spark 1.5 will use parquet 1.7.0 so the problem will be solved)
- Parquet is not “natively” supported in Spark, instead, Spark relies on Hadoop support for the parquet format – this is not a problem in itself, but for us it caused major performance issues when we tried to use Spark and Parquet with S3 – more on that in the next section
Parquet, Spark & S3
Amazon S3 (Simple Storage Services) is an object storage solution that is relatively cheap to use. It does have a few disadvantages vs. a “real” file system; the major one is eventual consistency i.e. changes made by one process are not immediately visible to other applications. (If you are using Amazon’s EMR you can use EMRFS “consistent view” to overcome this.) However, if you understand this limitation, S3 is still a viable input and output source, at least for batch jobs.
As mentioned above, Spark doesn’t have a native S3 implementation and relies on Hadoop classes to abstract the data access to Parquet. Hadoop provides 3 file system clients to S3:
- S3 block file system (URI schema of the form “s3://..”)
which doesn’t seem to work with Sparkwhich only work on EMR (Edited: 12/8/2015 thanks to Ewan Leith)
- S3 Native Filesystem (“s3n://..” URIs) – download Spark distribution that supports Hadoop 2.* and up if you want to use this (tl;dr – you don’t)
- S3a – a replacement for S3n that removes some of the limitations and problems of S3n. Download “Spark with Hadoop 2.6 and up” support to use this one (tl;dr – you want this but it needs some work before it is usable)
When we used Spark 1.3 we encountered many problems when we tried to use S3, so we started out using s3n – which worked for the most part, i.e. we got jobs running and completing but a lot of them failed with various read timeout and host unknown exceptions. Looking at the tasks within the jobs the picture was even grimmer with high percentages of failures that pushed us to increase timeouts and retries to ridiculous levels. When we moved to Spark 1.4.1, we took another stab at trying s3a. This time around we got it to work. The first thing we had to do was to set both spark.executor.extraClassPath and spark.executor.extraDriverPath to point at the aws-java-sdk and the hadoop-aws jars since apparently both are missing from the “Spark with Hadoop 2.6” build. Naturally we used the 2.6 version of these files but then we were hit by this little problem. Hadoop 2.6 AWS implementation has a bug which causes it to split S3 files in unexpected ways (e.g. a 400 files jobs ran with 18 million tasks) luckily using Hadoop AWS jar to version 2.7.0 instead of the 2.6 one solved this problem – So,with all that set s3a prefixes works without hitches (and provides better performance than s3n).
Finding the right S3 Hadoop library contributes to the stability of our jobs but regardless of S3 library (s3n or s3a) the performance of Spark jobs that use Parquet files was still abysmal. When looking at the Spark UI, the actual work of handling the data seemed quite reasonable but Spark spent a huge amount of time before actually starting the work and after the job was “completed” before it actually terminated. We like to call this phenomena the “Parquet Tax.”
Obviously we couldn’t live with the “Parquet Tax” so we delved into the log files of our jobs and discovered several issues. This first one has to do with startup times of Parquet jobs. The people that built Spark understood that schema can evolve over time and provides a nice feature for DataFrames called “schema merging.” If you look at schema in a big data lake/reservoir (or whatever it is called today) you can definitely expect the schema to evolve over time. However if you look at a directory that is the result of a single job there is no difference in the schema… It turns out that when Spark initializes a job, it reads the footers of all the Parquet files to perform the schema merging. All this work is done from the driver before any tasks are allocated to the executor and can take long minutes, even hours (e.g. we have jobs that look back at half a year of install data). It isn’t documented but looking at the Spark code you can override this behavior by specifying mergeSchema as false :
and in Clojure:
Note that this doesn’t work in Spark 1.3. In Spark 1.4 it works as expected and in Spark 1.4.1 it causes Spark only to look at _common_metadata file which is not the end of the world since it is a small file and there’s only one of these per directory. However, this brings us to another aspect of the “Parquet Tax” – the “end of job” delays.
Turning off schema merging and controlling the schema used by Spark helped cut down the job start up times but, as mentioned we still suffered from long delays at the end of jobs. We already knew of one Hadoop<->S3 related problem when using text files. Hadoop being immutable first writes files to a temp directory and then copies them over. With S3 that’s not a problem but the copy operation is very very expensive. With text files, DataBricks created DirectOutputCommitter (probably for their Spark SaaS offering). Replacing the output committer for text files is fairly easy – you just need to set “spark.hadoop.mapred.output.committer.class” on the Spark configuration e.g.:
A similar solution exists for Parquet and unlike the solution for text files it is even part of the Spark distribution. However, to make things complicated you have to configure it on Hadoop configuration and not on the Spark configuration. To get the Hadoop configuration you first need to create a Spark context from the Spark configuration, call hadoopConfiguration on it and then set “spark.sql.parquet.output.committer.class” as in:
Using the DirectParquetOutputCommitter provided a significant reduction in the “Parquet Tax” but we still found that some jobs were taking a very long time to complete. Again the problem was the file system assumptions Spark and Hadoop hold which were the culprits. Remember the “_common_metadata” Spark looks at the onset of a job – well, Spark spends a lot of time at the end of the job creating both this file and an additional MetaData file with additional info from the files that are in the directory. Again this is all done from one place (the driver) rather than being handled by the executors. When the job results in small files (even when there are couple of thousands of those) the process takes reasonable time. However, when the job results in larger files (e.g. when we ingest a full day of application launches) this takes upward of an hour. As with mergeSchema the solution is to manage metadata manually so we set “parquet.enable.summary-metadata” to false (again on the Hadoop configuration and generate the _common_metadata file ourselves (for the large jobs)
To sum up, Parquet and especially Spark are works in progress – making cutting edge technologies work for you can be a challenge and require a lot of digging. The documentation is far from perfect at times but luckily all the relevant technologies are open source (even the Amazon SDK), so you can always dive into the bug reports, code etc. to understand how things actually work and find the solutions you need. Also, from time to time you can find articles and blog posts that explain how to overcome the common issues in technologies you are using. I hope this post clears off some of the complications of integrating Spark, Parquet and S3, which are, at the end of the day, all great technologies with a lot of potential.
Image by xkcd. Licensed under creative-commons 2.5
I’ve found your post on apps flyer while searching for a solution to a problem I have encountered while using Spark, Parquet and S3.
I seems to be getting an error “File already exists” when writing the data to S3.
What I assume happened is that for some reason a single task failed and did not rollback the save and thus the following errors occurred.
Have you encountered any similar problems? What is the solution for handling failed writes without crashing the entire job?
Thanks a bunch,
Thanks for the thorough post! Extremely helpful.
Has the state of spark + s3 + parquet improved with spark 2.0?
My team is considering this type of setup and curious to hear if ease and/or speed has improved.
Grant: funny you just asked this; Not really.
* If you build spark with the SPARK-7481 PR merged in, you get maven adding all the artifacts you need, including transitive dependencies, and excluding the stuff you don’t need. That’s not been merged in though: reviews, comments and asking others to test it would really help there.
* Hadoop 2.7.1 is the minimum Hadoop version where S3A can really be used. Do make sure you are using Hadoop 2.7.x or later
* Hadoop 2.8 S3A (see HADOOP-11694 ) has some really superb speedups on data input, with HADOOP-13203 deserving a special mention, along with other seek optimisations. The ORC and parquet formats both seek around a lot, and the existing S3 client used to break the HTTP connection to move around the object. Now the S3A client, if started with fadvise=random, will read in blocks of data, and reuse the same HTTP connection for forward or backward seeks. It’s significantly better. If you are thinking of building spark with the PR I mentioned earlier, try doing it with a local build of hadoop-2.8 to see a tangible improvement.
* There’s more work going on about tuning Spark against object stores, driven by profiling things and then working out where the problem lies. Being open source, all contributions there are welcome. That includes finding problems and limits on the existing code.
Comments are closed.