Deterministic scale-out for spark jobs under increased load

Dec 13, 2019·
Dr. Georg Heiler
Dr. Georg Heiler
· 2 min read
blog

A lot of ETL jobs run on a daily basis. For these jobs, the trick below is not relevant. However in case you need to backfill data or want to compute analytical queries on more than one day (atomic unit of execution) things can get tricky.

In case functions like repartition oder wide transformations like join or an aggregation are used they heavily depend on the paralellism, which in turn depends on the amount of data being processed.

Warning
In case the input data is not static for example when backfilling or executing analytical queries over multiple time frames time consuming hand tuning of various spark configuration parameters is required.

Additionally the shuffle IO could be unnecessarily large. When simply processing a greater than expected number of data.

Instead breaking the job down into iterated (i.e. daily parts) might limit the required IO. A naive implementation would use Oozie to perform such a task. However:

  • this would spin up an additional master container for each job.
  • Further limitations are: an additional tool would need to be learnt.
  • Moreover, any pre setup or post completion tasks like the calculation of a dataframe reused in all iteration steps would need to be performed each time over and over again.

This is inefficient.

It is more efficient to use spark itself for this task and iterate there and achieve paralellism on a different level. One more benefit is that you can control exactly the size of the outputted partitions. A repartition(numberPartitions) will now work on an atomic unit of data, and thus has a reduced shuffle IO load and is easier to configure.

This results in a job which scales more deterministically with increased amounts of data.

A function like:

def iterationModule[A](iterationItems: Seq[A], functionToApply: A => Any, parallel: Boolean = false): Unit =
    if (parallel) {
      iterationItems.par.foreach(functionToApply)
    }
    else {
      iterationItems.foreach(functionToApply)
    }

Can be called where functionToApply computes the desired result and outputs the relevant data for each partitions (daily) transformed data back to a storage system.

An example of how to apply the iteration module:

def doStuff(inputPartition: String, reusedStuff: String): Unit = println(inputPartition + "__"+ reusedStuff)

iterationModule[String](Seq("20190101", "20190102"), doStuff(_, "foo"))
Dr. Georg Heiler
Authors
senior data expert
Georg is a Senior data expert at Magenta and a ML-ops engineer at ASCII. He is solving challenges with data. His interests include geospatial graphs and time series. Georg transitions the data platform of Magenta to the cloud and is handling large scale multi-modal ML-ops challenges at ASCII.