Rapid integration testing for Spark ETL pipelines
November 16, 2020
In this blog, I will discuss a particular problem the Engineering Team at Panaseer faced with our data pipelines and the unique solution we came up with to solve the issue. I will start with a brief description about how these data pipelines are structured, what tools are used and the process we previously had to go through when modifying and testing these workflows. I will then describe the problem faced, as well as the solution we engineered, both from a high-level perspective but I will also dive into some of the implementation details. This blog is for those with a technical background however no specialist knowledge will be assumed and there will be no code.
At Panaseer, the core of our back-end data platform is our complex data pipelines which are responsible for ingesting data from heterogeneous data sources, performing a number of transformations and analysis operations on said data, before a final loading stage into a database which serves our web-client.
This process, typically known as ETL (Extract Transform and Load), involves a number of different Hadoop technologies such as Hive, a data warehouse built on top of HDFS and used to hold the intermediary storage tables, as well as Spark, a distributed-computing framework which operates on immutable DataFrames. The power of ETL is that, due to the separation of the pipeline stages, as well as the natural properties of relational data, there is ample opportunity for parallelism:
Data Parallelism: Splitting a sequential file into smaller data files to provide parallel access, à la Hadoop.
Pipeline Parallelism: Simultaneous running of several components on the same data stream. e.g. The transformation stage on data block X can be run at the same time as the extract stage for data block X-1.
Component Parallelism: Simultaneous running of multiple processes on different data streams in the same job.
In this blog, I am going to talk about a specific problem that we faced with our ETL pipelines, specifically how a user tests any configuration changes that they make.
How our ETL is configured
At Panaseer, the data pipelines begin by ingesting data from a number of different data sources such as ServiceNow, Qualys and Microsoft AD. The flow for collecting and processing data from each of these data sources is unique, meaning we have to write and maintain a large amount of Spark code. As a solution to manage this vast complexity, we created a JSON-based language to manage the ETL flows as config. We have generic table readers (e.g. HiveReader) and loaders as well as configurable transformers which can be chained together to create pipelines of arbitrary length and complexity. Here is an example code snippet of this ETL config:
stages = [
stage = “read”
readerClass = HiveReader
tableName = “my_table”
filterExpr = “column_1 is not null and column_1 != ””
stage = “withDataSet”
dataSetIDs = [“my_table”]
transforms-loads = [
stage = “transform”
transformerClass = ingest.transform.ConfigurableTransformer
Path = “transformations/custom_transformation.conf”
We find this useful for a number of reasons. Firstly, it allows us to reuse a lot of code easily. For example, we have a configurable transformer which we can pass specific transformations (more config) into and it will be converted into Spark code and be applied to the DataFrame. Secondly, the config code is much more high-level and declarative than lower-level Spark code- it is clear what this code is doing and most of the low-level technical implementation details are hidden away.
Previously, when any user wanted to test an ETL pipeline configuration change, they were forced to deploy their code to a dev or staging environment with our chosen Hadoop distribution installed.
The flow was something like this:
- Make necessary configuration changes locally to ETL config files
- Sync (deploy) repository with the target environment
Without any confidence the configuration is correct:
- Run the corresponding ETL job
- Tail the logs while waiting for Spark Context to initialise
- Check the logs to see if there were any issues
- If there was an error, go back to step 1 until no more errors were found
The issues with this flow were obvious. Firstly, the time taken to configure, deploy and run these ETLs could take up to 15 minutes resulting in a very slow feedback loop, minimising developer productivity. A second issue is that, due to the dependence on Hadoop services (Hive, Zookeeper, Hbase, Yarn etc.), it was impossible to test the ETLs locally. This was an issue as, apart from the transformation unit tests, there was no easy way we could tests these pipelines end-to-end and plug it into our CI workflows. We needed a solution!
What were the requirements?
- ETL must be able to be run locally with no dependency on any external service. e.g. Zookeeper, Hive, HBase.
- The solution must provide a considerable performance speedup, improving the development experience and developer productivity.
- The solution must allow us to be able to test our ETL pipelines E2E and be runnable in CI
Solution high-level view
The solution we came up with was to have a tool, similar to how the AWS Glue Crawler works, to scan Hive and HBase and extract the schemas for the underlying tables. These schemas are then stored in a central schema repository, which we call the Schema Registry. We decided to use raw files in source control, rather than a document database or some other central repository, so a user could easily access these files in the same place as the ETL code and they could copy the files to their local development environment for offline use.
Here is a diagram illustrating the flow described above:
Then, when running the ETL, instead of reading from Hive, we use the schema files to create an empty Spark DataFrame and perform the Spark operations on that. In this way, we are effectively mocking the interaction with Hive whilst being able to test the Spark transformations and their interactions with real DataFrames. The key point here is that we are not testing the data within the DataFrames, we are just testing the logical operations of the ETL pipeline and verifying that any configuration changes result in the correct output schema.
It is worth stating now that a separate solution would be to run all the required Hadoop services, such as Yarn, Zookeeper, Hive and Spark, inside a virtual machine. This solution was tested but due to the large memory requirements and long start-up times we found the performance to be sub-optimal.
I will now go into more detail about the various stages in the chosen solution.
Generating the schemas
The first thing we needed was a way to obtain the schemas for the underlying Hive/HBase tables. We developed a CLI tool (the crawler), which scans the data stores and extracts the schema for each table. The output of is a set of YAML files, one for each table, describing the table’s schema. For example:
– column_name: collect_tstamp
– column_name: criticality
– column_name: datasrc_endpoint
– column_name: is_deleted
These files are then stored in our central Github repository where they are available for both development and customer-site use.
NB. Obviously table schemas evolve over time so we run this CLI tool as part of a nightly workflow to ensure the schemas are kept up-to-date.
New flow for testing ETL config changes
We added a–dry-run flag to our ETL runner script which would trigger the HiveReader (the component to read a generic table from Hive) to instead interact with the Schema Registry. The Schema Registry reads the necessary schema YAML file from the local filesystem and then creates an empty DataFrame based on the provided schema.
An optimised local Spark Context is spun up and the ETL is ran against this (empty) DataFrame exactly like it would in the normal flow. After the final transformation, instead of writing back to a Hive table, the HiveLoader performs schema validation by comparing the resulting DataFrame with the expected schema. It does this by once again interacting with the schema registry.
Here is a diagram illustrating the new flow:
The key point is that we have removed the dependency on any external storage system and replaced it with the Schema Registry, which is just a component to manage the interactions with the local file system!
What we have gained
The first and obvious benefit we have gained is that the ETL can be run locally. There is no dependence on any Hadoop services such as Zookeeper, Hive, HBase. The result is that the ETL is much faster to run, resulting in a ~10x (!!) speedup, which allows for quicker iterations and easier testing of ideas. With the schema files stored in the same place as our ETL config, we can now develop integration tests which we can run directly in our IDE, rather than having to run a script on some environment and tail the logs to look for errors. As such, the development experience is dramatically improved. Finally, we are able to package these tests up into a Gradle task which we can be run as part of our continuous integration workflows, giving us increased confidence that our code changes are not breaking anything unexpected!
Thanks for reading!
I hope that you learned something and please leave a comment with any suggestions if you think this process could be improved.