ScrollThe Elder Scripts

SQL queries on Apache Kafka

March 16, 2018

Apache Kafka is a good example of a great product that knows both its strengths and its limits. One feature that Confluent, the developers of Kafka, apparently do not want to support is random access to the messages in the topics, or search queries on those messages. This is an exercise that Kafka’s documentation leaves to the reader.

To Confluent’s credit they supply us with excellent libraries and frameworks, such as KSQL or Kafka Streams, that we can use to build our own queryable stores, backed by Kafka topics. However, if you deal with Kafka, there comes a time when you just wish that you could run a simple, boring SQL query to find the subset of data that results in some weird behaviour or that your boss wants to see right this minute.

I’m glad to tell you that it is in fact possible, and you don’t have to write a single line of code to search, filter, slice and dice the Kafka topics, as long as they contain JSON messages. You can even use your favourite JDBC client to perform those SQL queries.

Now, with enough buildup, I can explain that I am talking about a tool called Apache Drill. It’s a SQL query engine that can read data—and deduce schemas—from a number of various backends, such as Hadoop, MongoDB, Hive or just a file system. As I found out recently, Drill has a plugin that allows us to use Kafka as backend for its queries. In this post I will show how to set up this plugin and what capabilities it gives you.

In this post I’m assuming that you’re using macOS. The commands should also work on Linux without too much hassle.


Index


Preparing a Kafka cluster

If you have read this far into the post , I assume that you know what Kafka is. On the off chance that you don’t, Kafka is a popular streaming platform built around the idea of a distributed logDistributed streaming what? Find a more detailed explanation here: kafka.apache.org/intro .If you already have a cluster that you can play with, then you can skip this section, otherwise read on to find out how to start up a Kafka cluster and push some data into it.

Quick installation

The easiest way to run a Kafka broker is to use a distribution that Confluent themselves provide, aptly named the Confluent Platform.

wget http://packages.confluent.io/archive/4.0/confluent-oss-4.0.0-2.11.tar.gz
tar -xvzf confluent-oss-4.0.0-2.11.tar.gz
cd confluent-4.0.0
./bin/confluent start


UP is good.
You should see a bunch of messages reporting that the necessary services are “UP”. This means that we can proceed by pushing some messages into our new cluster. We will not use most of these services in this exercise, but for simplicity’s sake we start the Confluent Platform in its default configuration.

Producing some test messages

Run the Kafka producer for a topic called drilltest:

./bin/kafka-console-producer --broker-list localhost:9092 --topic drilltest

Then, when you’re in the > prompt you can enter some JSON messages, for example:

{ "a": 123, "b": "drilling" }
{ "a": 321, "b": "not anymore", "c": "testing" }

You will probably see an error complaining that a LEADER is not AVAILABLE. Disregard it, this is only Kafka complaining that it doesn’t know the topic. But the topic will be auto-created for us.

Preparing Apache Drill

Now that we have a Kafka cluster that we’d like to explore, let’s proceed with installing and configuring Apache Drill.

Installation

Run the following commands to download, unpack and run Drill:

wget http://apache.mirrors.hoobly.com/drill/drill-1.12.0/apache-drill-1.12.0.tar.gz
tar -xvzf apache-drill-1.12.0.tar.gz
cd apache-drill-1.12.0
./bin/drill-embedded

Drill will take several seconds to start up, and then you should see a quirky welcome messageI got: the only truly happy people are children, the creative minority and drill users and a prompt that looks like this: 0: jdbc:drill:zk=local>. We can test the Drill’s query capabilities by trying it out on a filesystem backend.

First, prepare a file to query:

echo '[{"a": 1}, {"a": 2}, {"a": 3, "b": "dfs test"}]' > /tmp/dfstest.json

Then, run a query in Drill:

SELECT * FROM dfs.tmp.`dfstest.json`;

You should see the data from the JSON file laid out neatly in columns:

If you do, that means that Drill is properly installed! Note that it has guessed what a schema for our data could look like. Now we can configure Drill to read from Kafka.

Pointing Drill at Kafka

For this part we are going to use the web UI of Apache DrillI encourage you to look around in the Drill web UI, it has a few tools you might find useful . Open localhost:8047/storage in your browser, find Kafka in the list of Disabled Storage Plugins and click Update next to it. You’ll get to the Configuration page for the Kafka plugin. Enter the following into the big text field:

{
  "type": "kafka",
  "kafkaConsumerProps": {
    "key.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer",
    "auto.offset.reset": "earliest",
    "bootstrap.servers": "localhost:9092",
    "enable.auto.commit": "true",
    "group.id": "drill-query-consumer-1",
    "value.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer",
    "session.timeout.ms": "30000"
  },
  "enabled": true
}

Then click Update and Back. That’s all it takes! Finally we can unleash the Drill’s queries on that drilltest Kafka topic that we’ve created earlier.

Finally, SQL queries on a topic

In the Drill prompt enter:

USE kafka;
SHOW tables;

At this point you should see all Kafka topics that currently exist in the cluster, including our drilltest. Proceed with:

SELECT * FROM drilltest;

You should see all the messages that you’ve sent to your topic earlier:

Let’s try something more sophisticated:

SELECT * FROM drilltest WHERE a = 123;

Note that next to the data from JSON the table also includes metadata columns like kafkaPartitionId, kafkaMessageOffset and kafkaMsgTimestamp. I’ve found those immensely useful, because they allow you to query messages based on their time of arrival or find out something about your data. For example, this is how we find out the earliest and the latest timestamp of all messages in the topic.

SELECT kafkaPartitionId,
    from_unixtime(MIN(kafkaMsgTimestamp)/1000) AS minKafkaTS,
    from_unixtime(MAX(kafkaMsgTimestamp)/1000) AS maxKafkaTS
    FROM drilltest GROUP BY kafkaPartitionId;

Exploring Kafka topics from DBeaver

In the beginning of the post I promised that you can use a JDBC client to run SQL queries on Kafka topics, and now it’s time to fulfil the promise. My client of choice is DBeaver. If you’d like to follow along, you can download it here: dbeaver.jkiss.org/download. You should keep Drill running for this exercise.

Once DBeaver is installed, run it and click the leftmost button in the menu bar to create a new connection. You will see the list of drivers available in your system. Expand the Hadoop entry and you will find the Apache Drill driver. Select it and click Next. You’ll get to the Connection settings screen. Now you just need to set the connection details, specifically the JDBC URL. Click Edit Driver Settings and in that screen change the URL Template to jdbc:drill:drillbit=localhost. Click OK to go back and make sure that your change has been applied. You can test the connection to make sure everything is tied together, and then click FinishIf you have errors while configuring the driver, please ensure that you have the latest versions of both Drill and DBeaver. I am using Drill 1.12.0 and DBeaver 5.0.0. .

This is it! Now you have a connection to Drill, and you can expand the kafka schema to find the topics (which are called Tables in this view). DBeaver allows you to treat those topics like if they are database tables, so you can view the data and run queries on it.

Isn’t it beautiful:

Limitations and further reading

In the Drill release I am using the Kafka plugin is included, and you can find its source code and documentation on Github. However, at the moment the plugin only works with JSON messages. The developers do have a plan to support Avro in one of the future releases.

You might find that a query on a big busy Kafka topic can be rather slow. This is not Drill’s fault, it is just a consequence of how Kafka is designed. For this reason I wouldn’t recommend using Drill queries on Kafka in hot paths in production, this feature is better suited for development and ad-hoc support scenarios.

For the purposes of this tutorial we have started Drill in the simplest embedded mode. If you like what the tool has to offer, you can find out how to run it in a cluster setup in the Drill Documentation.

In this tutorial I was using the following versions of tools:


Apache Drill 1.12.0
Apache Kafka 1.0.0-cp1
Confluent Platform 4.0.0
DBeaver 5.0.0
SQL queries on Apache Kafka - March 16, 2018 - Yury Liavitski