2015-08-10

Thanks to Barclays employees Sam Savage, VP Data Science, and Harry Powell, Head of Advanced Analytics, for the guest post below about the Barclays use case for Apache Spark and its Scala API.

At Barclays, our team recently built an application called Insights Engine to execute an arbitrary number N of near-arbitrary SQL-like queries and execute them in a way that can scale with increasing N. The queries were non-trivial, each constituting 200-300 lines of SQL, and they were running over a large dataset for hours as Apache Hive scripts. Yet, we need to execute 50 queries in less than an hour for our use case.

While we could have achieved huge speed-ups by moving from Hive to Impala, we determined that SQL was a poor fit for this application. Our solution instead was to design a flexible, super-scalable, and highly optimized aggregation engine built in Scala and Apache Spark, with some help from functional programming. This post discusses the problem and presents the Barclays solution, in particular showing how applying functional programming in Apache Spark was key to the outcome. The techniques discussed in this post will be useful for writing custom machine-learning algorithms or building complex applications, especially when flexibility, abstraction, robustness, stability, and speed of delivery are important.

Design of the Insights Engine

The initial use case for the Insights Engine was to use data from card transactions to calculate KPIs for a Barclays business customer, compare them to the KPIs of similar local businesses, filter them to expose only relevant insights and to preserve other customers’ privacy, and present them in natural-language form. Two examples of such insights might be:

“This year, your business spent £1,000 on electricity, other hairdressing businesses in Blackpool spent on average £1,200 on electricity.”

“Based on the transactions for the past 12 months, your customers spend £25 on average each time they visit your business. By comparison, customers of other hairdressing businesses in Blackpool spend £23.”

Although these “insights” seem quite general, they follow a common pattern:

Calculate a statistic (in the first case, the sum of the amount spent on electricity in a year).

Compare to a relevant benchmark (in this case similar businesses in the same location).

Filter results (in this case privacy).

Rank results (in this case in order of relevance).

Within this pattern, the Insight specification itself is just a set of filters (industry type, location, check for privacy rule, and so on) and operations (sum, mean, median, group by region, and so on)—or more generally functions, which are simply parameters of the pattern. By abstracting the pattern away from the function we can build a flexible general engine, where the Insight definition assumes the pattern and only the function parameters need to be specified through a terse DSL.

Although this approach works, there is often tension between flexibility and performance, and we need both. Parallel computation does not provide enough performance by itself; Hive is too slow because the complexity of each query requires multiple disk read/writes. The in-memory capabilities of Apache Spark go some way toward addressing this issue (given enough memory) but it still did not provide the performance we need. However, it turns out that if the patterns and functions can be expressed in such a way that they are commutative and compose-able (more later), then the Insights Engine can run with a single pass over the data—in a way that is not only very fast, but also very scalable.

The Ultimate Requirement: Respecting Customer Data

A big technical, methodological, and design challenge in building such a complex and abstract system is to ensure that data is always used for the benefit of all our customers. At Barclays, we use only anonymized, aggregated, meta-level data for non-personal banking trends and insights. So, step one was to design a privacy engine and get it peer-reviewed by people inside and outside of our team. Once the logic had been agreed, the technical challenge was to have near-provable assurance that the code would do what we want it to do. Therefore, we needed powerful static typing and compile-time checking, and robust frameworks for test-driven development and property-based testing. Again, these requirements pointed us toward using Scala.

In the remainder of this post, we’ll explore functional programming and the details of how we used it within Scala/Spark to build and to optimize the Insights Engine at Barclays. And, by the way, this solution:

Calculates multiple contextual insights

Is a general framework that can be applied far beyond the initial use case

Is really fast (500x faster than Hive), as well as scalable

Is fully tested and passed UAT without material modification

Building Scalable Algorithms with Functional Programming

Step 1: Unlearn Programming

Poetry is the art of giving different names to the same thing. Mathematics is the art of giving the same name to different things. - Henri Poincaré (paraphrased)



The absence of “state” and “mutation” in Scala makes reasoning about the evaluation of expressions, especially in parallel systems, much easier than imperative languages like Java or Python. This characteristic is essential for harnessing modern hardware that has more cores, more nodes, and more RAM, rather than faster clock speeds. Moore’s Law is dead, it’s time to get functional!

Adding syntactic sugar to easily do lambda expressions is not sufficient to make a language functional, this is just the start. Functional languages provide many other features and tools that encourage coders to make an entire paradigm shift to a more mathematical way of thinking. (For example, read this fantastic paper about MapReduce.)

By narrowing the focus to only a small number of compose-able abstract concepts, such as sets, sequences, numbers, functions, function composition, and abstract algebra, functional programming achieves significant expressive power. For example, machine-learning algorithms can take hundreds of lines of imperative code to implement yet they can be defined in just a handful of equations.

By taking less time typing, a functional programmer can spend more time thinking—which is perhaps why the shift to Scala has gained a reputation for being difficult. Moreover, since there is less code to go wrong and the compiler can check each keystroke, bugs are reduced to the point where “debugging” (running code and inspecting its state to know what it does) is a practice that needs to be unlearned.

Step 2: Understand Which Resources to Optimize

The following table shows the priority of optimizations you should make in your algorithm design.



Step 3: Use Efficient Data Types

Premature optimization is the root of all evil. - Donald Knuth

For big data, this statement is wrong. Knuth’s argument against premature optimization is that it usually makes program logic harder to debug. But given that functional programming virtually eliminates bugs, the main residual source of bugs in big data code is caused by resource issues. But identifying resource issues is hard because Spark’s powerful functional pipelining of stages means that if your code doesn’t work, it’s hard to tell which part of the code is to blame. So optimizing simple things like data types from the start quickly pays off.

Try to pack as much data as possible into as few bytes as possible. Avoid Strings, index them. Pack “192.168.0.1″ into an Integer. Use domain-specific knowledge. (For example, we knew the range of values for six Integers in a case class was limited, so we packed them all into a Long!) Indexing can add some time to your job, but with Spark’s zipWithIndex, it shouldn’t take much effort to code. In fact, we crush many of our datasets into HashMaps, which we can then broadcast over the cluster. eliminating joins and giving us great flexibility in our algorithm design. All these steps in the job contribute a “constant” time cost, which means that as the data scales, and in our case the number of queries scales, these constant time jobs of packing, indexing, caching, and broadcasting virtually don’t change.

More code means more tests. We used ScalaCheck’s powerful property based testing (PBT) DSL to write an exhaustive test suite. In PBT we define properties, not cases. (We just tell the framework that our data packing and unpacking functions need to compose to produce the identity function. ScalaCheck does the work of finding edge cases, counter examples, off-by-one errors, and so on.) As discussed previously, this framework also gives us the required confidence regarding near-absolute correctness.

Step 4: Minimize MapReduce Stages and Shuffle Cost

Spark’s rich API facilitates writing MapReduce stages as elegant one-liners. Of course, short code doesn’t mean short run times, which is where functional programming, particularly thinking in terms of algebraic types, is also required.

The classic example is average. One does not want to first compute totals, then counts, even if the data is cached in memory. In Scala, this task is easy:

We use the “magic semigroup operator”, |+|, from Scalaz, to do the aggregation for us, which works for just about every “algebraic” data type you can think of: sparse vectors, histograms, sets, maxs, minimums, and so on. reduceByKey will only shuffle the results of sub-aggregations in each partition of the data. A groupByKey would shuffle all the data, which is slow, and only then start aggregating, which can cause OOMs. Furthermore, we did two aggregations at the same time. This principle can extend to N aggregations and non-trivial forms of aggregation.

The DataFrames API produces equally terse and readable code, but the functional API fits our use case better. The key reasons are:

Large code bases require static typing to eliminate trivial mistakes like “aeg” instead of “age” instantly.

Complex code requires transparent APIs to communicate design clearly.

2x speedups in the DataFrames API via under-the-hood mutation can be equally achieved by encapsulating state via OOP and using mapPartitions and combineByKey.

Flexibility and Scala features are required to build functionality quickly.

Step 5: Other Tricks

Steps 1-4 are pretty much all you need, but we used some other tricks to get our Insights Engine to super scale. First, we use large broadcasted HashMaps over RDDs whenever possible, using Step 3 to keep down the memory footprint. This means our RDDs now only need carry around a key and we lookup the other data as and when we need it.

We also extend the trick of using monoids (things that can naturally be added together) to break the job down into a dozen stages, starting at the business level, then aggregating that level to give the finest level of location and industry, and so on until we have a single data point being all business in the whole UK. This approach drastically improves performance, a bit like how summing N numbers takes O(log(N)) (given sufficient CPUs).

Monoids can have practical restrictions though, for example taking unions of sets to count uniques can be memory inefficient. Our use case doesn’t require delivering actual datasets to users (a full sort, for example), so surprisingly monoids serve virtually all our requirements by using imaginative “algebriac data structures,” like using capped size structures, convergent probabilistic structures (see BlinkDB), and so on.

Finally, we use “sealed families of singleton objects extending functions” to organzse our filters, maps, and aggregations. In a nutshell, that means we can check equality on code paths as not to duplicate execution—which seemingly provides auto-magical distributed “memorization” but not via traditional AOP mechanisms.

Conclusion

As you can see, the combination of functional programming and Spark made our pretty hard problem pretty easy!

Show more