Spark and Kafka - Getting Cozier

I’m a huge fan of the reappearance of Enterprise Service Buses. They are especially great for Big Data systems and the Lambda Architecture: messages get sent to various different streams on the bus and consumers can read them in a streaming or a batch operation as desired.

(a good introduction to the idea of a Enterprise Service/Message Bus from last year)

Obviously, you wait for a decent Enterprise Service Bus/Data Stream Bus/PubSub/Messaging Log1 and then many come at once. One of the most popular in recent times is Apache Kafka - developed at LinkedIn to be capable of handling their huge throughput requirements. It’s quickly become a de-facto component of many a Spark Streaming or Storm solution.

In the world of Spark, though, Kafka integration has always been a bit of a pain. If you look at this guide to integrating Kafka and Spark, it’s clear that wrangling more than a simple connection to Kafka involves quite a bit of faff, having to union multiple DStreams as they’re coming in from Kafka to increase parallelism. Spark is supposed to be easier to work with than that!

Well, in Spark 1.3, a new interface to Kafka was added. It’s still (in 1.4) marked as ‘experimental’, but I know of several companies who have been using it in producing for months, handling billions of messages per day (I imagine that it will be marked as safe in 1.5 if you’re still cautious). And it makes things so much simpler!


val ssc = new StreamingContext(new SparkConf, Seconds(5))

val kafkaParams = Map("metadata.broker.list" -> “kafka-1:9092,kafka-2:9092,kafka-3:9092”)
 
val topics = Set(“example-topic”, “another-example-topic”)
 
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

stream.map(_._2). // and then do Spark stuff!
// ...  
ssc.start()
ssc.awaitTermination()

This automatically creates a DStream comprised of KafkaRDDs which read in parallel from the number of Kafka partitions. No union required! As a bonus, because Spark handles the offsets that have been read, bypassing ZooKeeper, the new approach gains exactly-once semantics (with the downside that ZooKeeper no longer knows exactly what the Spark Streaming application is doing, which may cause some monitoring issues unless you manually update ZooKeeper from within Spark).

Also in 1.3 and above - batch access to Kafka! You can create KafkaRDDs and operate on them in the usual way (a boon if you’re working on a Lambda Architecture application).


val offsetRanges = Array(
      // args are: topic, partitionId, fromOffset (inclusive), untilOffset (exclusive)
      OffsetRange(“example-topic”, 0, 110, 220)
)
 
val rdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](sc, kafkaParams, offsetRanges)

(In batch mode, you currently have to manage reading from all the partitions and the offsets in the Kafka log yourself.)

Okay, so we can now do parallelism with Spark and Kafka in a much simpler manner…but an important feature of these architectures is writing results back to the bus (e.g., flagging possible fraudulent bids in a real-time auctioning system for further investigation). Unfortunately, baked-in support of this is not scheduled until 1.5 (see SPARK-4122 for more details), so for now, you have to handle the details here yourself - consider a connection pool if you find yourself doing many writes back to Kafka in a short time.


  1. the cited difference between a data stream bus/log and an enterprise bus seems to be that traditional enterprise buses tended to do transformations on the bus itself, whereas systems like Kafka are much simpler and leave it up to the consumers to transform data (and possibly write it back to the bus under a different topic). [return]