You’ve spent hours, no, days getting your Spark cluster up and running (maybe it’s on YARN, maybe Mesos, or even just standalone). It’s a shiny thing with over 50 instances running in AWS, burning money at a furious pace. You run your first ETL job and…
wait, it’s only using ONE EXECUTOR? WHY IN HEAVEN’S NAME $#$%$%@$%^?
You start messing around with spark.default.parallelism
, going so far as adding explicit parallelism parameters in all your RDD calls…and yet…nothing.
So here’s what could be going wrong. When you make an RDD, Spark splits the RDD into partitions based on either the spark.default.parallelism
value, or the user-supplied value:
(spark.default.parallelism = 2)
scala> sc.parallelize(Seq(1,2,3)).toDebugString
res12: String = (2) ParallelCollectionRDD[4] at parallelize at :22 []
scala> sc.parallelize(Seq(1,2,3), 200).toDebugString
res13: String = (200) ParallelCollectionRDD[5] at parallelize at :22 []
Let’s load in a text file!
scala> sc.textFile("CHANGES.txt",200).toDebugString
res15: String =
(200) MapPartitionsRDD[9] at textFile at :22 []
| CHANGES.txt HadoopRDD[8] at textFile at :22 []
Well, that looks fine. But maybe you’re not loading in a simple text file. Maybe, just maybe, you’re storing your input files in a compressed manner (perhaps to save space on S3).
scala> sc.textFile("CHANGES.txt",200).toDebugString
res15: String =
(200) MapPartitionsRDD[9] at textFile at :22 []
| CHANGES.txt HadoopRDD[8] at textFile at :22 []
<pre>
scala> sc.textFile("C.gz",200).toDebugString
(1) MapPartitionsRDD[13] at textFile at :22 []
| C.gz HadoopRDD[12] at textFile at :22 []
</pre>
Eh? We’ve asked for 200 partitions, and we’ve only got one. What’s wrong? Well, you’ve probably already guessed - Spark can split a text file into a bunch of lines with no trouble, and the cores can operate on those bunches separately. But it’s not magic; it can’t split a GZ file and somehow magically decompress chunks of binary. Hence one partition, one core.
Is there anything that can be done? Well, you could look into storing the files uncompressed. It’s the easiest solution! But not always the most practical. Or you could take a look at using LZO compression, which has a different encoding so it can be split across a cluster (at the cost of less compression overall). But maybe you can’t control the choice of compression, either. Enter repartition
:
scala> sc.textFile("C.gz").repartition(100).toDebugString
res38: String =
(100) MapPartitionsRDD[43] at repartition at :22 []
| CoalescedRDD[42] at repartition at :22 []
| ShuffledRDD[41] at repartition at :22 []
+-(1) MapPartitionsRDD[40] at repartition at :22 []
| MapPartitionsRDD[39] at textFile at :22 []
| C.gz HadoopRDD[38] at textFile at :22 []
Of course, you’ve spotted the ShuffledRDD
in the lineage above. After all, if you want the information to be spread around the cluster…you are actually going to have to spread it around the cluster. This is likely to cause a performance hit at the start of a Spark job, but this overhead may be worth incurring if the rest of the processing is distributing more evenly. Test, evaluate, and take your fancy!