At Adevinta Spain we are building a Data Platform where we use multiple applications and frameworks some of them based on Spark.
In order to increase the quality of our Spark applications we wanted to run tests in the same way as we did with other frameworks. What means, we wanted to be able to run unit, integration and end-to-end tests.
This article explains the way Spark tests are run at Adevinta Spain. Hopefully, it will be useful for other big data developers searching ways to improve the quality of their code and at the same time their CI pipelines.
Unit, integration and end-to-end tests.
When working with Spark, developers usually will be facing the need of implementing these kinds of tests. Other tests like smoke tests, acceptance tests, etc, etc are outside the scope of this article so I will not be mentioning them.
- Unit Tests: at this level we will be dealing with code that does not require a Spark Session in order to work. Also, this kind of code does not talk with the outside world.
- Integration Tests: at some point we will need to use a Spark Session. At this level we will be testing Spark transformations and in many cases we will have to deal with external systems such as databases, Kafka clusters, etc, etc.
- End-to-end Tests: our application probably will be composed of several Spark transformations working together in order to implement some feature required by some user. Here, we will be testing the whole application.
Spark project layout
This is a typical scala project layout. I think this layout should work under any use case but if it does not work for you, at least I hope, it will bring some inspiration or ideas to your testing implementation.
src/ ├── main │ └── scala │ └── example │ ├── app │ │ └── AwesomeApp.scala │ ├── job │ │ └── AwesomeJob.scala │ └── service │ └── AwesomeService.scala └── test ├── resources │ ├── awesomejob │ │ └── sourcepath │ │ └── awesome.json │ └── log4j.properties └── scala └── example ├── app │ └── AwesomeAppEndToEndTest.scala ├── job │ └── AwesomeJobIntegrationTest.scala ├── service │ └── AwesomeServiceTest.scala └── SharedSparkSessionHelper.scala
Under this package we will find the classes in charge of running our Spark applications. Typically we will have only one Spark application.
A Spark application should implement some kind of transformations. Modules under this package run Spark jobs that require a Spark Session.
Sometimes business logic does not require a Spark Session in order to work. In such cases, we can implement the logic in a different module.
Shared Spark Session
One of the biggest problems to be solved when running Spark tests is the isolation of these tests. Running a test should not affect the results of another. In order to achieve this goal we are going to need a Spark Session for each set of tests, in this way, the results of these tests will not affect others that will also require a Spark Session.
So, we need to implement a system that will enable us to run, clear and stop a Spark Session whenever we need it (before and after a set of related Spark tests)
The details of the implementation are explained down below:
beforeAll: beforeAll is a scala test function that runs before any other test in our class under test. We will be using this function for starting our Spark Session.
sparkConf: sparkConf function enables us to load different Spark Sessions with different Spark configurations.
embedded hive: spark-warehouse and metastore_db are folders used by Spark when enabling the Hive support. Different Spark Sessions in the same process can not use the same folders. Because of that, we need to create random folders in every Spark Session.
beforeEach: scala test function that creates a temporary path which is useful when our Spark tests end up writing results in some location.
afterEach: clears and resets the Spark Session at the end of every test. Also, it removes the temporary path.
afterAll: stops the current Spark Session after the set of tests are run. In this way we will be able to run a new Spark Session if it is needed (if there is another set of tests requiring the use of Spark)
How it works
The basic idea behind SharedSparkSessionHelper lies in the fact that there is one Spark Session per Java process and it is stored in an InheritableThreadLocal. When calling getOrCreate method from SparkSession.Builder we end up either creating a new Spark Session (and storing it in the InheritableThreadLocal) or using an existing one.
So, for example, when running an end-to-end test, because SharedSparkSessionHelper is loaded before anything else (by means of the beforeAll method), the application under test will be using the Spark Session launched by SharedSparkSessionHelper.
Once the test class is finished, the afterAll method stops the Spark Session and removes it from the InheritableThreadLocal leaving our test environment ready for a new Spark Session. In this way, tests using Spark can run in an isolated way.
This article would be nothing without a real example. Just following this link you will find a project with sbt, scalatest, scalastyle, sbt-coverage and scalafmt where I use the SharedSparkSessionHelper trait.
This application can be run in any of the available clusters that currently exist such as Kubernetes, Apache Hadoop Yarn, Spak running in cluster mode or any other of your choice.
Testing Spark applications can seem more complicated than with other frameworks not only because of the need of preparing a data set but also because of the lack of tools that allow us to automate such tests. By means of the SharedSparkSessionHelper trait we can automate our tests in an easy way.
I hope this article was useful. If you enjoy messing around with Big Data, Microservices, reverse engineering or any other computer stuff and want to share your experiences with me, just follow me.
Top comments (0)