Parallelism thrills but sometimes it kills (Big Data - Spark) !!
Parallelism thrills but sometimes it kills !!
There is a rather popular notion of “Speed thrills but it kills”, this generally means sometimes the thing
which gives you wings or makes you faster can also become very dangerously catastrophic. This applies
to our very own Big data world as well. Applications, especially in-memory processing frameworks like
Apache spark. I have recently faced such a confusing scenario where following all the norms
framework(parallelism, distributed approach) was causing the application to slow down significantly.
So I decided to share the details here which can help you avoid such encounters.
Problem Statement :
There was a requirement where it was needed to combine multiple dataframes getting generated dynamically
to form a single final result which will combine all the individual data points from each of the dataframes.
Code snippet : (Simplified version)
val spark = SparkSession.builder.appName("testSpark").master("local").getOrCreate()
import spark.sqlContext.implicits._
val cred_score_df = spark.sparkContext
.parallelize(Seq((0, 35), (1, 41), (2, 51)))
.toDF("id", "cred_score")
val trust_score_df = spark.sparkContext
.parallelize(Seq((0, 23), (1, 44), (2, 52)))
.toDF("id", "trust_score")
val health_score_df = spark.sparkContext
.parallelize(Seq((0, 31), (1, 45), (2, 53)))
.toDF("id", "health_score")
val study_score_df = spark.sparkContext
.parallelize(Seq((0, 31), (1, 45), (2, 53)))
.toDF("id", "study_score")
val intellect_score_df = spark.sparkContext
.parallelize(Seq((0, 31), (1, 45), (2, 53)))
.toDF("id", "intellect_score")
So we went for a couple of verses of solutions. After going through a couple of rough patches, at last
we arrived at the best possible solution for such problem statements. Each solution has its own
advantages and disadvantages.
Solution 1 : (Traditional looping)
We can always go through the traditional while loop approach on the problem to generate the
dataframes depending on the availability of the dataframe generated and perform the join operation
sequentially.
/*Solution 1*/
def joinDfs(a: DataFrame, b: DataFrame,c:String): DataFrame = {
val joinDF = a.join(b.select("id",c), Seq("id"))
.select(a.col("id"),b.col(c).as("score"))
println(c+" DF count : " + joinDF.count)
joinDF
}
val cols = Map( "trust_score"-> trust_score_df, "health_score"->health_score_df, "study_score"->study_score_df, "intellect_score"->intellect_score_df)
//val dfList = List(df2, df3, df4, df5)
var joinedDF = cred_score_df.select(col("id"),col("cred_score").alias("score"))
val colSeq = Seq("id", "score")
var finalDF = Seq.empty[(String, String)].toDF(colSeq: _*)
for (col_name <- cols) {
joinedDF = joinDfs(joinedDF,col_name._2,col_name._1)
joinedDF.printSchema()
finalDF = finalDF.union(joinedDF)
finalDF.show(false)
}
DAG :
This would be easier to implement and a straightforward solution which will provide the expected solution.
But this doesn’t tick all the boxes for spark framework, such as :
a. This provides a more traditional solution the dataframes will be processed sequentially and will create
multiple jobs which takes more time, but you can avoid most of the memory , shuffling related issues.
b. If dataframes are huge in volume, then this would work but at the expense of processing time, which
can increase exponentially.
Solution 2 :
We can utilize the dataframe API features to implement the parallelism to its higher extent as follows :
/* Solution 2 */
var parallelDF = cred_score_df.select(col("id"),col("cred_score").alias("score"))
val finalDF1 = cols
.map(x => {
parallelDF = parallelDF
.join(x._2.select(col("id"),col(x._1).as("score")),Seq("id"))
.select(parallelDF.col("id"),parallelDF.col("score"))
.distinct()
println(x._1+parallelDF.count)
parallelDF
}).reduce(_.union(_))
Lineage :
This always produces the best result with the perfect implementation of Dataframe API’s to their full
potential. This will provide the maximum parallelism, scalability in case of large datasets. But it has it’s
limitations :
i. This can seriously blow up the lineage if the number of dataframes being generated goes beyond the size
of maximum lineage allowed in Spark. In such cases this can seriously impact the performance of the
application.
ii. In another case, if the data source for the dataframe has issues which can lead to duplicates causing
the data size to blow up to terabytes which can’t fit in the executors’ memory then it will cause the
application to fail causing the cluster to go down.
iii. Most significant issue with this approach is the size of the lineage(DAG) can lead to
unexpected failure of the application due to Spark’s limitations.
Conclusion:
So both the approaches will do the trick but you need to carefully scrutinize the approach before
implementing this. Let me give you some guidelines to help you decide the correct approach :
Comments
Post a Comment