Data aggregation in HBase: A minimalistic approach
April 11, 2016
HBase is designed for close to real-time analytics on large data sets. While optimized for speed and scalability HBase doesn’t provide advanced query capabilities out of the box. More often than not we would appreciate the ability to aggregate in real-time on-the-fly without maintaining precompiled materialized views. Let’s take a look at some of the options of how to achieve this goal.
Apache Hive
Apache Hive is the first and the most obvious choice. Aggregation functions are built in and accessible via a SQL-like interface. Since Hive queries typically take a considerable amount of time to run, we are restricted to offline batch data analysis since Hive is not built for interactive use cases. Provided we are looking at data that change infrequently (once a day), Hive doesn’t impose any major restrictions. Real-time data can still be loaded into HBase and enhanced with pre-aggregated views provided by Hive. However, what if we simply can’t determine all of the aggregations upfront? Such a degree of flexibility is necessary for our data science team, who are in charge of designing new visualisations. Ad-hoc aggregations over large data sets are key to the successful delivery of new insights.
HBase
Another solution, slightly more involved compared to Hive, is the use of HBase endpoint coprocessors. Coprocessors are akin to database triggers and stored procedures. They bring custom code to region servers which guarantees consistent performance at scale by distributing computational load and processing the data where it is stored. Custom computations, such as aggregation functions, are performed by a special kind of the coprocessor framework, so called Endpoints Coprocessors. Unfortunately, implementing an endpoint coprocessor is relatively complicated and involves 3rd-party libraries (Google’s Protocol Buffer).
Apache Pheonix
Finally, we could reach out for tools enhancing HBase with additional analytics capabilities. Apache Phoenix looks promising in many regards, but there is a learning curve involved as well as an additional dependency (Phoenix server). More importantly, though, tapping into a 3rd party solution creates a risk of a data model lock-in and loss of control over low-level details.
How to set up HBase for data aggregation
Now, can we utilise HBase client API to build a client-side aggregation application without adding much of an overhead?
A Web Client submits a query request by using simple yet flexible API endpoints. There is a variety of ways of how the endpoints could be implemented. Suppose the following path for aggregation queries:
/{table_name}/{aggregation}/{field1}/by/{field2}
In a hypothetical application of transport-related sensor data we could, for example, evaluate driving efficiency by looking at an average mileage (miles per gallon):
GET /truck_mileage/avg/mpg/by/truckid
A corresponding pseudo-SQL:
SELECT truckid, avg(mpg) avg_mpg FROM truck_mileage GROUP BY truckid;
The example is taken from Hortonworks Hadoop Tutorial, check it out on their website.
An obligatory REST API transforms client requests into query criteria and pushes them down to the Query Adapter. The adapter is a SQL abstraction on top of a data source, such as HBase.
public interface QueryAdapter { | |
QueryAdapter select(String… fields); | |
QueryAdapter from(String tableName); | |
Graph aggregate(Aggregation aggregation); | |
} | |
public interface Aggregation<T extends Number> { | |
T calculate(); | |
} | |
public class Graph { | |
// This is a DTO, a graph representation of tabular data | |
} |
Here is how the adapter is used in one of the API endpoints:
import org.springframework.web.bind.annotation.PathVariable; | |
import org.springframework.web.bind.annotation.RequestMapping; | |
import org.springframework.web.bind.annotation.RestController; | |
import org.springframework.beans.factory.annotation.Autowired; | |
@RestController | |
public class QueryController { | |
@Autowired | |
private QueryAdapter queryAdapter; | |
@RequestMapping(“/{tableName}/avg/{field1}/by/{field2}”) | |
public Graph avg(@PathVariable String tableName, | |
@PathVariable String field1, | |
@PathVariable String field2) { | |
return queryAdapter | |
.select(field1, field2) | |
.from(tableName) | |
.aggregate(new Average(field1, field2)); | |
} | |
// Endpoints for other aggregation functions: sum, count etc. | |
} |
The adapter implementation depends on the underlying data source, an HBaseQueryAdapter turns queries into a Get or Scan operations via HBase Client API. Ideally, we only use gets or range scans by rowkey. The exact approach depends on the data model. Secondary indexing should be considered if there is a case for a full-table scan. Indexing can be achieved by generating lookup tables, which are kept in sync with master tables by the means of a coprocessor (RegionObserver).
Transforming query results into a graph is a crucial part of our data pipeline. Each and every Result object is passed to a Result Mapper which knows how to turn it into a graph node. An Aggregation Filter keeps track of the total count of the aggregated values and sums them up. Once all of the results are processed, the graph is updated with a list of collected nodes. Each node bears an aggregated value. In the simplest case it’s either a total sum, a total count or an average.
The bottom line is that, regardless of how much data flows through the pipeline, the mapping/aggregation module only keeps the bare minimum state required to group nodes by values of a certain field and uses just two numeric values while aggregating them.
Did you find this helpful?
At Panaseer we want to help IT, security and risk professionals get a full picture of their current security status through automated Continuous Controls Monitoring (CCM). With our platform you can pull in data points from across all of your cyber security applications to track things like Asset Inventory & Visibility, Privilege Access Management Analysis and Application/ Cloud security to name a few.