UDAFs In Spark

Today, we’re going to talk about User-Defined Aggregation Functions (UDAFs) and Dataset Aggregators in Spark, similar but different ways of adding custom aggregation abilities to Spark’s DataFrames. This may be of interest to some. For the rest of you, umm…I suggest a cup of tea and some digestives instead.

User-Defined Aggregation Functions

UDAFs as a concept come from the world of databases, and are related to User-Defined Functions (UDFs) While UDFs operate on a column in a row in an independent fashion (e.g. transforming an entry of ‘5/2/2016’ into ‘Monday’), UDAFs operate across rows to produce a result. The simplest example would be COUNT, an aggregator that just increments an value for every row in the database table that it sees and then returns that number as a result. Or SUM, which might add up a column in every row in the table.

Or, to put it another way: UDFs are map(), UDAFs are reduce().

Normally things like SUM and COUNT will be built into the data manipulation framework you’re using; UDAFs come into their own for implementing custom logic in a reusable manner. If you and your team often need to generate a custom probability distribution for your warehouses’ delivery times, maybe you can implement it as a UDAF once and then everybody can get access to it without having to reimplement the logic over repeated queries.

UDAFs in Spark

Adding a UDF in Spark is simply a matter of registering a function. UDAFs, however, are a little more complicated. Instead of a function, you have to implement a class that extends UserDefinedAggregateFunction. Here’s a UDAF that implements harmonic mean:

class HarmonicMean() extends UserDefinedAggregateFunction {

  def deterministic: Boolean = true
  def inputSchema: StructType = StructType(Array(StructField("value", DoubleType)))
  def dataType: DataType = DoubleType
  def bufferSchema = StructType(Array(
    StructField("sum", DoubleType),
    StructField("count", LongType)
  def initialize(buffer: MutableAggregationBuffer) = {
    buffer(0) = 0.toDouble
    buffer(1) = 0L
  def update(buffer: MutableAggregationBuffer, input: Row) = {
    buffer(0) = buffer.getDouble(0) + ( 1 / input.getDouble(0))
    buffer(1) = buffer.getLong(1) + 1
  def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
    buffer1(0) = buffer1.getDouble(0) + buffer2.getDouble(0)
    buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
  def evaluate(buffer: Row): Double = {
    buffer.getLong(1).toDouble / buffer.getDouble(0) 

Let’s walk through the class to see what each of the methods do. Firstly, deterministic() is a simple flag that tells Spark if this UDAF will always return the same value if given the same inputs (in this example, the Harmonic Mean should always be the same given the same inputs, or else we’ve got bigger problems). The next two methods, inputSchema() and dataType() specify the input and output data formats. We’re not doing anything crazy here - just requiring that our input column is a double and that our output will also be a double. You’re free to create UDAFs with weird and wonderful type signatures though, bringing in multiple columns and outputting anything you like.

With those out of the way, we now need to specify the schema of our buffer. The buffer is a mutable object that will hold our in-process calculations. For calculating the harmonic mean, we’re going to need a running count of the sum of the reciprocals, plus another variable which can count the numbers we’ve seen for the final calculation. The bufferSchema is defined as a StructType, here with an array of two StructFields, one of type Double and the other as type Long.

Having finally set up all the types (come on, this is Scala! The typing is all the fun, right? Right? Anybody?), we can implement the methods that will calculate our mean. As you might expect, initialize() is called first. Here, we’re making sure that both the sum and the count fields are initialized with zero values.

update() and merge() are where your aggregations happen. update() takes two arguments, a MutableAggregationBuffer where aggregation has already taken place, and a new Row which needs to be processed. In this example, we add the reciprocal value of the incoming row to the buffer (note that we don’t do a reciprocal on the buffer because the contents of the buffer have already been processed).

merge(), on the other hand, merges two already-aggregated buffers together. This is needed because Spark will likely split the execution of the UDAF across many executors (which is want you’d want, of course!) and it needs a way of combining those aggregations for the result. Here, like in many UDAF examples that compute a mean, the merge() is very straightforward. We just need to sum the two different counts, and the two different sums of reciprocals. Your custom merging logic may be more complicated than this.1

Finally, there’s evaluate(). This gets called at the end of the UDAF’s processing. In this example, evaulate() actually produces the harmonic mean result we’re looking for by dividing the count by the sum of the reciprocals.

Using UDAFs

Having defined the UDAF, how do you actually use it? Well, it’s so easy, like UDFs, you get two choices. Firstly, there’s the fairly-obvious method of using it in DataFrame aggregations, like this:

val hm = new HarmonicMean()
val df = sc.parallelize(Seq(1,2,4)).toDF("value")

But you can also register the UDAF and use it transparently within SparkSQL queries:

sqlContext.sql("SELECT hm(value) AS hm FROM df")

As you can imagine, the latter method is a great way of providing additional functionality to your Spark platform which can be introduced to your analytics team without having to step outside of their SQL comfort zone.

Behind the scenes

UDAFs are implemented as SparkUDAF, a class that extends ImperativeAggregate (This is one of the two AggregateFunctions available in Spark - the other being DeclarativeAggregate which works directly with Catalyst expressions rather than the row-based approach of ImperativeAggregate).

You can trace through the AggregationIterator class to see how Spark walks through the execution of the aggregators - it’s not especially pretty, but it does work!

What Happened to Dataset Aggregators?

I spent a bit longer on the UDAFs than I planned, so I’ll do a separate follow-up post where I look at Dataset Aggregators.

  1. Essentially ‘may you live in interesting times’, but for Spark. [return]

Kentucky! No, Ohio! No, Kentucky! No, Ohio!

Back from Cincinnati again, and this time I can say I saw more of the city. Including, perhaps, the most insane supermarket I have ever seen.

A photo posted by Ian Pointer (@carsondial) on

This is Jungle Jim’s, perhaps the only supermarket to think “I know, let’s jam another supermarket on to the end of the original one and go crazy!”

A photo posted by Ian Pointer (@carsondial) on

Yes, that’s Robin Hood standing over the British foods section. There’s also an island, a boat, a fire engine, singing cereal mascots…and…look, I can’t do it justice. It even imports Tesco and Sainsbury products, for goodness sake and somehow manages to get proper Cadbury chocolate (don’t ask me how, given the Hershey clampdown). It is almost every American shopping stereotype brought to life…complete wi th a monorail out front. Simply wonderful.

Somewhat late to the party, I finished watching the first series of Detectorists this week as well. I had given it a wide berth because of Crook, and I was totally wrong. It turned out to be exactly the gentle comedy that I never thought could come from somebody involved with The Office. And I’ll confess that at times it even made me a little homesick. It might be nice to go home one year when it isn’t Winter.

One thing that has been nice about travelling so far this year is that I’m going to places where I actually know people (okay, so not Nashville or Atlanta, but my hit-rate is much better this year)! This time I got to meet up with Tammy to see where she’ll hopefully end up living in a couple of weeks’ time. While it’s technically in Kentucky, you can literally walk in a straight line for half-an-hour, go over a bridge and you’re back in Ohio. I could entertain myself for a fair portion of the afternoon simply by walking across states. I have issues.

Whilst you wouldn’t normally put ‘hipster’ and Cincinnati together, it is a decent-sized city and that means that they are present.

A photo posted by Ian Pointer (@carsondial) on

It’s a converted post office that is now a restaurant selling fried chicken! Surrounding it: high-end pet shop, hipster shops selling homewares, and of course the bars spilling out into the street and the Sunday sun. So yes, Over-The-Rhine is trés hipster. But, you know what? It felt inclusive in a sense that you don’t often get in Durham. Plus the lunch at The Eagle cost less than half than what you’d pay here in the Triangle.

Oh, and the beer cellar where we played board games also has this shop for all your lederhosen needs. What more could you possibly want from a city?

Mesos With Google Container Registry

Mesos With Google Container Registry

Hey you crazy kids! Have you ever set up a Mesos cluster inside Google Compute Engine and wanted to pull Docker containers from Google’s Container Registry rather than having to stand up your own registry? Did you try it and get a bit lost? Because I certainly did, but it turns out that it’s really not that hard.

First, you’ll need to create a JSON service account key from the API Manager in the GCE console. Download that and create a new instance inside your environment (this instance is expendable - I just preferred making sure I wasn’t leaving anything on the Mesos control node myself).

Copy the JSON file to that instance and then log in. Then, issue this command:

    docker login -e 1234@5678.com -u _json_key -p "$(cat [JSON_FILE])" https://gcr.io

(the email address doesn’t have to be a legitimate address you control, but it does have to be a legal email address. Which is odd, but there you go)

This saves the required credentials in that user’s .docker directory. So, let’s tar that up!

    tar czf docker.tar.gz .docker

Now, copy that file across to all your worker nodes (may I suggest Ansible? Just add that to your playbook during worker node creation), preferably to an obvious place like /etc (you could also store it on Google Cloud Storage instead!)

(oh, and you can destroy that sacrificial instance now!)

The JSON below is a (simplified!) Marathon entry for pulling a dashboard container from Google’s Container Registry instead of DockerHub:

    “id”: “dashboard”,
    “cpus”: 0.2,
    “mem”: 512,
    “instances”: 1,
    “container”: {
        “type”: “DOCKER”,
        “docker”: {
            “image”: “gcr.io/[PROJECT]/dashboard”,
            “network”: “HOST”
    “uris”: [“file:///etc/docker.tar.gz”]

The magic is the uris array that tells the Mesos worker where to find the private registry and the credentials needed to login (in this case, the Google registry).

And as for pushing your images up to CR? That’s pretty simple too:

  • Tag your image:
    • docker tag -f dashboard gcr.io/[PROJECT]/dashboard
  • Push your image:
    • gcloud docker push gcr.io/[PROJECT]/dashboard

And voilà! Your Mesos cluster can now pull from your private Container Registry!

(note, you may want to check the docs if you’re not in the US - you can push to and pull from a registry hosted nearer to you rather than just gcr.io - and for those of us in the US, we can’t rely on gcr.io always being the US, though it is currently)

This Quiet Darkness

As I dragged the new razor blade over my face, I instantly recognized the signs of a dull edge. There’s at least one in every bulk pack of Mach 3 blades I buy from Costco1. I didn’t stop. Sheer bloody-mindedness and laziness. With predictable results.

I think of myself as not being too vain, but there’s something that just screams failure when you step out of the shower with blood running down your chin. Thirty-seven years old, and can’t even shave without making a total mess of it.

Therefore, I will be staying inside for the rest of the day, waiting for the damn thing to clot properly and wondering where my next psoriasis flare is going to be found. I’m not really sure that this ‘getting older’ thing is all it’s cracked up to be…

  1. Yes, yes, Gillette are evil…but Dollar Shave Club and Harry’s are simple bundlers of Dorco razors, which I’ve found to be absolutely terrible. [return]

Things Learnt From The Family Visit

Picnic has good BBQ but the real draw is their apple crumble - the best I’ve tasted on this side of the Atlantic.

Waking up on the airbed facing a wall and taking 30 seconds to realize that no, I’m not at home in Bicester.

That my new rug has more square footage than the bedroom I spent three decades sleeping in.

Blue Bloods is a terrible TV show.

But Criminal Minds: Beyond Borders is so jingoistic and offensive that I think that it’s secretly staffed by a cabal of Russia Today writers.

I’m getting better at this pizza thing. One of them was almost round. I forgot to put cheese on it, but it was round…

Concrete cakes are great.

My sister’s Funko problem will soon require a 12-step programme.

Don’t forget that Dad doesn’t like working with water.

I have undone all my exercise to reduce the Christmas weight-gain in one week. Good job!

It is easy to forget that Durham has a load of great things going on.

While the psoriasis continues to spread, my liver is fine. So there’s that.

Nzinga’s is probably the best brunch ‘secret’ in Durham right now.

I can no longer use ‘mid-thirties’.

Going to Saltbox at 1pm on a Saturday is a recipe for sold-out sadness. Sorry, Mum!

I need to get googly eyes for Bob.

Everything is so quiet now.

I had 1000 teabags this morning. Now I have 999. THE COUNTDOWN BEGINS.

Everybody is Ill


A photo posted by Ian Pointer (@carsondial) on

I have 1.5kg of British mini eggs. This might, might be more than even I can deal with.

But I will try.

Having a full house is slightly odd. With the travelling this year, not sleeping in my own bed is less of an issue, but I can’t just go off and do my own thing at the moment. Of course, by ‘my own thing’ I mean ‘I can’t sit in pyjamas and watch 1970s British television all day and night’, so it might be a good thing, really. Plus I am going to get cake. And presents!

(My sister points out that I am not to over-sell the cake. But then when I said ‘it might be rubbish!’ I got accused of calling their proposed cake rubbish. So I can’t win. I’m sure it’ll be great, though!)

Today, sadly, everybody is ill. Which means people covered in blankets on the couch, me sent off to Target to find medicine, and a rather quiet Sunday. Almost, in fact, like Sundays back home. Maybe I should try and find an episode of Antiques Roadshow for later on this evening…


Less than a week to go until my family arrives again!

“Haven’t they just been here?”

No, that was just my Dad (and uncle and my uncle’s brother). This is not an installing-a-new-shower visit, but rather a visiting-Ian-on-his-birthday visit!


I may be excited. Of course, it does mean that I’ll be spending the week on an airbed (because I am a good brother and I’ll give up my bed for my sister), but there will be cake. Cake is important.

Oh, and also British mini-eggs. MINE! ALL MINE!

(Okay, so I may share. Maybe. If you are all very nice. But I’d come around quickly, as I apparently have previous form when it comes to mini-eggs)

And now, IKEA! Plus board games. Of course.

Easter 2016

Easter Holidays are a weird time here in the US. After growing up in a country where the four-day weekend is a national institution, it is deeply odd and depressing being in a place where the chocolates are mostly terrible and there’s no concept of the Bank Holiday Weekend.

(Christmas would have the same issue; but after one miserable Christmas here, I have fixed that by simply going home every year. It helps)

But, you have to get by, and so, I have tried to approximate the weekend. This has involved watching old episodes of Top of The Pops from 1981, a Lindt milk chocolate bunny, cleaning, buying outside furniture, ironing, and getting through as many episodes of The Sweeney as I can before catching up with Stewart Lee’s Comedy Vehicle. All while thinking I should be doing something more productive. So in that sense, it really is like Easter Weekend.

Also, it’s been raining solidly since 2am last night…the wet miserable rain helps a lot to set the proper mood.

Heath Robinson Rules OK

2016: where I boot a Linux VM with VirtualBox on MacOS X with a USB passthrough driver so I can hook up a EXT3-formatted HDD and read the data from it, copying to a network share which is back on MacOS X, but actually a mount point to a USB 3.0 pocket drive.

The most impressive part of all the above is that it all works. Although I may need 2 weeks to copy 2Tb of information to the new drive.

I was planning on spending the weekend working on algorithms, digging into Spark 2.0, and being productive. But I watched How To Steal A Million instead and I think I made the right decision.

Oh, also: chamber vacuum sealer-enhanced cookie dough is now a thing.

Under pressure.

A photo posted by Ian Pointer (@carsondial) on


This entry is late, but I had a good excuse for not posting yesterday - I instead spent the day making a roast dinner for friends. I think my Yorkshire Pudding game is getting pretty good at this point:


A photo posted by Ian Pointer (@carsondial) on

(that, of course, is tempting fate for the next time I attempt them)

The Third Man is playing at The Carolina Theatre this week and you should see it if you can. If you think that Major Calloway is perhaps too terribly arch and British, I’d like to point out that the actor playing him had a clause in all his contracts that specified that he would not work on days where a cricket Test Match was being played. NONE MORE BRITISH.

(of course, he’s also in this film, which…well, how that ever got made continues to elude me. Or how come everybody involved didn’t get arrested)

This week, though, this week should be quieter. I am, maybe, just maybe…going to have a Saturday and Sunday all to myself for the first time since…January?

Also, when did it get to be mid-March? I feel like people slipped some extra days by me when I wasn’t looking…