Towards QA II: Testing Spark Apps

Overview

This post is part II in our series on QA. Previously we discussed the concepts of property based testing (PBT) in the context of an individual application. In this post we will focus on applying PBT and other techniques to test applications in Spark.

We will focus on the most popular tesitng library for Spark: Spark-testing-base

sscheck is an honourable mention, but in the interest of brevity we will not be covering it here.

Unit testing

Spark-testing-base allows us to write concise unit tests to test Spark applications, without writing the boilerplate code needed to setup/teardown SparkContext's. Let's take a look and see some of the operations you can do with it...

When setting up a project don't forget to:

1. Increase heap and perm gen size with: javaOptions ++= 
Seq("-Xms512M", "-Xmx2048M", 
"-XX:MaxPermSize=2048M", "-XX:+CMSClassUnloadingEnabled")
2. Disable parallel execution: parallelExecution in Test := false

Initialisation:

Spark-testing-base handles the setup/teardown of SparkContexts for you by means of the SharedSparkContext trait. This automatically sets up a context in local mode to share amongst all members of a test suite. Since Akka will not immediately unbind on shutdown it also clears the spark.driver.port property for you.

All we need to do is create a normal ScalaTest suite and mixin SharedSparkContext:

class myTests extends FunSuite with SharedSparkContext
Basic RDD comparison:
    test("test rdd comparison") {
      val expectedRDD = sc.parallelize(List(1, 2, 3))
      val resultRDD = sc.parallelize(List(3, 2, 1))

      /* Assert equal without ordering, 
      and not equal with ordering */
      assert(RDDComparisons
        .compare(expectedRDD, resultRDD) isEmpty)
      assert(RDDComparisons
        .compareWithOrder(expectedRDD, resultRDD) nonEmpty)
    }

Here there are two cases - comparing as permutations or combinations (aka with or without ordering).

Property based testing on RDD's:

In the context of the previous post, we can also do PBT in Spark. As with ScalaCheck, we can generate values for primitive types; custom classes will need you to implement a generator. An example of PBT in Spark is shown below:

test("filter works as expected") {
        
      /* Set min/max size of RDD */
      implicit val generatorDrivenConfig =
        PropertyCheckConfig(minSize = 10, maxSize = 200)

      /* Generate random Employee's 
      Assert filtering for a property is true for all the results */
      val property =
        forAll(RDDGenerator.genRDD[Employee](sc) {
          val generator: Gen[Employee] = for {
            name <- Gen.oneOf("Mark", "Ignas", "Christos")
            salary <- Gen.choose(50000, Int.MaxValue)
          } yield {
            if(name equals "Mark")
              Employee(name, salary*salary)
            else
              Employee(name, salary)
          }
          generator
        }) {
          rdd => rdd.filter(_.salary < 100000).collect() 
          forall(_.salary < 100000)
        }

      check(property)
    }

The library also allows for comparisons of DataFrames and Datasets, but we will focus instead of testing streaming data as much of the other testing follows a similar style to the RDD comparisons.

Streaming data:

Spark-testing base tests operations on streaming data by asserting results on each batch in a stream. In this way it simulates a stream and it's operations. Just like with RDD comparisons, we can compare with or without ordering.

The example below shows how we can test operations on streaming data. It is expected that employees with the name Abraham will have their salary doubled by the operation. To give a slightly less trivial example, the operation included demonstrates how we can integrate external logic / rules engines into our Spark code (or test cases in this example). This particular example loads an xls file which contains the logic to test for the name Abraham and apply the transformation. The execution of the rules is performed in a popular engine called Drools through a KIE session (the details aren't important for this post however so don't worry if this isn't familiar - what's important is the Spark code).

Basic streaming equality check:

/* Assumes a class that extends StreamingSuiteBase */

    test("basic transformation") {
      val input = List(
      Employee("Abraham", 10000), 
      Employee("Lincoln", 20000))
      val expected = List(
      Employee("Abraham", 20000), 
      Employee("Lincoln", 20000))
      testOperation[Employee, Employee]
      (input, transformInput _, expected, ordered = false)
    }

    /* The transformation we are applying */
    def transformInput(employees: DStream[Employee]): 
    DStream[Employee] = {
      employees.transform(rdd => {
        val evaluatedEmployees = rdd
        .mapPartitions(incomingEvents => {
          val ruleUrl = new DefaultResourceLoader()
            .getResource("classpath:demo/employees/rules.xls")
            .getURL

          evalRules(ruleUrl)(incomingEvents)
        })

        evaluatedEmployees
      })
    }

    /* Code to evaluate rules on each employee */
    def evalRules (resourceFileName: URL)(incomingEvents 
    : Iterator[Employee]): Iterator[Employee] = {
      val ksession = KieSessionFactory
      .getKieSession(resourceFileName)
      val employees = incomingEvents
      .map(employee => 
      ksession.execute(employee); employee)
      employees
    }

Testing operations over windows:

The example below is taken from the spark-testing-base documentation and illustrates how you can test windowing operations.

In this example we have the following settings:

  • Batch interval of 1s - cut results from the micro batches every second
  • Window duration of 3s - look as far back as 3s ago in each window 
  • Slide duration of 2s - compute results from the window every 2s

Hence, we'd expect two batches (batch2, batch1) and (batch4, batch3, batch2)

test("CountByWindow with windowDuration 3s and slideDuration=2s") {
val batch1 = List("a", "b")
val batch2 = List("d", "f", "a")
val batch3 = List("f", "g"," h")
val batch4 = List("a")
val input= List(batch1, batch2, batch3, batch4)
val expected = List(List(5L), List(7L))

def countByWindow(ds:DStream[String]):DStream[Long] = {
ds.countByWindow(windowDuration = Seconds(3), slideDuration = Seconds(2))
}

testOperation[String, Long](input, countByWindow _, expected, ordered = true)
}

Note that the streaming simulation times out after 10s, but you can override it with:

  override def maxWaitTimeMillis: Int = 20000

Closing remarks

So far so good - we're able to write clean concise unit tests for our Spark transformations... so what's the catch? The problem is by default, spark-testing-base runs tests in local mode. Despite how good Spark may be at simulating a cluster, we can't escape the need to test on a real cluster too.

The path of least resistance at the moment seems to be to use the SharedMiniCluster trait in the place of SharedSparkContext - with the added caveat that $SPARK_HOME must be set properly for this to function correctly. Note that although this works for normal batch operations, it does not appear to support streaming data.

If testing in a cluster is a necessity for you, I'd recommend you package them up in a fat jar and submit them to a standalone cluster (or YARN) as a sanity check in addition to the spark-testing-base tests. It should also be possible to automate submission & assertions as part of the build process (e.g on Jenkins) in conjunction with Spark-job-server to fetch the results. Other solutions, such as integration tests by databricks, have had success using docker with Spark to test in a cluster - but what if docker is not available/approved in your organisation? Either way, neither of these solutions are as of yet available in a clean library 'out-of-the-box'. Watch this space for more information on QA processes as we may followup with details on the docker or job-server based approach for integration testing.