Parallelism thrills but sometimes it kills (Big Data - Spark) !!

Parallelism thrills but sometimes it kills !!

There is a rather popular notion of “Speed thrills but it kills”, this generally means sometimes the thing

which gives you wings or makes you faster can also become very dangerously catastrophic. This applies

to our very own Big data world as well. Applications, especially in-memory processing frameworks like

Apache spark. I have recently faced such a confusing scenario where following all the norms

framework(parallelism, distributed approach) was causing the application to slow down significantly.

So I decided to share the details here which can help you avoid such encounters.

Problem Statement :

There was a requirement where it was needed to combine multiple dataframes getting generated dynamically

to form a single final result which will combine all the individual data points from each of the dataframes. 

Code snippet : (Simplified version)

 val spark = SparkSession.builder.appName("testSpark").master("local").getOrCreate()

    import spark.sqlContext.implicits._

    val cred_score_df = spark.sparkContext

      .parallelize(Seq((0,  35), (1,  41), (2,  51)))

      .toDF("id", "cred_score")

    val trust_score_df = spark.sparkContext

      .parallelize(Seq((0, 23), (1, 44), (2, 52)))

      .toDF("id", "trust_score")

    val health_score_df = spark.sparkContext

      .parallelize(Seq((0, 31), (1, 45), (2, 53)))

      .toDF("id", "health_score")

    val study_score_df = spark.sparkContext

      .parallelize(Seq((0, 31), (1, 45), (2, 53)))

      .toDF("id", "study_score")

    val intellect_score_df = spark.sparkContext

      .parallelize(Seq((0, 31), (1, 45), (2, 53)))

      .toDF("id", "intellect_score")

So we went for a couple of verses of solutions. After going through a couple of rough patches, at last

we arrived at the best possible solution for such problem statements. Each solution has its own

advantages and disadvantages.

 Solution 1 : (Traditional looping)

We can always go through the traditional while loop approach on the problem to generate the

dataframes depending on the availability of the dataframe generated and perform the join operation

sequentially.

/*Solution 1*/

    def joinDfs(a: DataFrame, b: DataFrame,c:String): DataFrame = {


      val joinDF = a.join(b.select("id",c), Seq("id"))

        .select(a.col("id"),b.col(c).as("score"))

      println(c+" DF count : " + joinDF.count)

      joinDF

    }

    val cols = Map( "trust_score"-> trust_score_df, "health_score"->health_score_df, "study_score"->study_score_df, "intellect_score"->intellect_score_df)

  

   //val dfList = List(df2, df3, df4, df5)

    var joinedDF = cred_score_df.select(col("id"),col("cred_score").alias("score"))

    val colSeq = Seq("id", "score")

    var finalDF = Seq.empty[(String, String)].toDF(colSeq: _*)


    for (col_name <- cols) {

      joinedDF = joinDfs(joinedDF,col_name._2,col_name._1)

      joinedDF.printSchema()

      finalDF = finalDF.union(joinedDF)

      finalDF.show(false)

    }


DAG : 



This would be easier to implement and a straightforward solution which will provide the expected solution.

But this doesn’t tick all the boxes for spark framework, such as : 

a. This provides a more traditional solution the dataframes will be processed sequentially and will create

multiple jobs which takes more time, but you can avoid most of the memory , shuffling related issues.

b. If dataframes are huge in volume, then this would work but at the expense of processing time, which

can increase exponentially.

Solution 2 : 

We can utilize the dataframe API features to implement the parallelism to its higher extent as follows : 

  /* Solution 2 */

    var parallelDF = cred_score_df.select(col("id"),col("cred_score").alias("score"))

    val finalDF1 = cols

      .map(x => {

        parallelDF = parallelDF

          .join(x._2.select(col("id"),col(x._1).as("score")),Seq("id"))

          .select(parallelDF.col("id"),parallelDF.col("score"))

          .distinct()

        println(x._1+parallelDF.count)

        parallelDF

      }).reduce(_.union(_))

Lineage : 

This always produces the best result with the perfect implementation of Dataframe API’s to their full

potential. This will provide the maximum parallelism, scalability in case of large datasets. But it has it’s

limitations :

i. This can seriously blow up the lineage if the number of dataframes being generated goes beyond the size

of maximum lineage allowed in Spark. In such cases this can seriously impact the performance of the

application.    

ii. In another case, if the data source for the dataframe has issues which can lead to duplicates causing

the data size to blow up to terabytes which can’t fit in the executors’ memory then it will cause the

application to fail causing the cluster to go down.

iii. Most significant issue with this approach is the size of the lineage(DAG) can lead to

unexpected failure of the application due to Spark’s limitations.

Conclusion:

    So both the approaches will do the trick but you need to carefully scrutinize the approach before
implementing this. Let me give you some guidelines to help you decide the correct approach : 

    a. You should go with more traditional approach for below circumstances :
            i. If the number of dataframes generated is limited. Which can prevent creation of larger lienages.
            ii. The size of the dataframes are too big.
     b. You should go with the solution with maximum parallelization in below circumstances : 
            i. You have proper understanding of the data being processed  which won’t cause duplicates or
            cartesian products which can make the application very less performant.
            ii. The lineage should never cross the limit set for the size of DAGs allowed by Spark framework.
            iii. If the data is being generated through proper data cleansing and derived from a proper data
            model then this solution provides an efficient solution.

Comments