Apache Kafka dürfte vielen Entwicklern vor allem als schnelles Queuing-System und als Quasi-Standard in diesem Bereich bekannt sein. Entsprechend wird es bisher vor allem als Puffer verwendet, um ankommende Daten vor der Verarbeitung zwischenzuspeichern oder einen möglichen Rückstau von Zwischenergebnissen und damit einen Nachrichtenverlust bei der Datenweitergabe zwischen verschiedenen Services zu vermeiden.
Kafka-Mastermind Jay Kreps hat allerdings große Pläne für sein Werkzeug, mit dem Einbau von Stream-Verabeitungsmöglichkeiten und sogar einer KSQL genannten SQL-Schnittstelle, plant er mit Kafka nichts weniger, als Datenbanken neu zu erfinden (vgl. auch: Turning the Database inside out) und hat damit bereits einiges an Neugierde ausgelöst. In diesem Blog-Beitrag möchte ich daher einige erste Ideen und Möglichkeiten aber auch aktuelle Beschränkungen von KSQL diskutieren und wie alle Schritte nachvollziehbar auflisten, die zum direkten Ausprobieren notwendig sind.
Getting Started
Der einfachste Weg KSQL auszuprobieren, ist sicher die Verwendung der sog. Confluent Platform (Download hier),
die alle dafür benötigten Einzelteile intergriert. Confluent (engl. für
zusammenfließend bzw. -wachsend) ist übrigens die Firma, die Jay Kreps
mit Kollegen nach seinem Weggang bei LinkedIn gegründet hat, um Kafka zu
vermarkten.
Voraussetzung zum Betrieb der Plattform ist ein installiertes Java 8 (neuere Versionen funktionieren aktuell leider noch nicht), dann kann sie nach dem Herunterladen und Entpacken direkt auf der Kommandozeile aus dem bin-Ordner durch Aufrufen von ./confluent start ausgeführt werden. Danach steht die KSQL-Shell per ./ksql zur Verfügung. SQL-Entwickler sollten sich hier schnell heimisch fühlen, da die üblichen SQL-Befehle wie gewohnt funktionieren, bspw. show tables;
Um eine Tabelle anlegen zu können, werden in KSQL allerdings sog. Kafka-Topics benötigt, in die die gewünschten Daten eingespielt werden. Für alle, die Kafka bisher nicht kennen, Topics sind Kafkas Ansatz zur Datenspeicherung und zunächst im Prinzip nichts weiter als große Listen bestehend aus Key-Value-Paaren. Um Daten in ein solches Topic hineinzubekommen gibt es, wer hätte es gedacht, ein weiteres Kommandozeilen-Skript (kafka-console-producer), das wie folgt aufgerufen werden kann:
./kafka-console-producer --broker-list localhost:9092 --topic exams --property "parse.key=true" --property "key.separator=:" < /pfad/prüfungen.json
Der Einfachheit halber pipen wir die Daten, die wir in das Topic hineinlegen möchten, aus einer Datei names prüfungen.json heraus. Diese könnte bspw. folgende Daten für Prüfungsleistungen an einer Hochschule oder Universität enthalten:
1:{"id":"1","name":"Big Data","note":1.0,"matrikelnummer":"12345"}
5:{"id":"5","name":"Programmierung 1","note":2.0,"matrikelnummer":"23456"}
17:{"id":"17","name":"Programmierung 1","note":4.0,"matrikelnummer":"12345"}
5:{"id":"5","name":"Programmierung 1","note":2.0,"matrikelnummer":"23456"}
17:{"id":"17","name":"Programmierung 1","note":4.0,"matrikelnummer":"12345"}
Wie leicht erkennbar ist, enthält die Datei drei Zeilen mit Key-Value-Paaren, die jeweils durch einen Doppelpunkt unterteilt werden. Der Value besteht aus einem JSON-Eintrag, der die eigentlichen Prüfungsdaten enthält. In obigem Beispiel sind das eine ID, ein Vorlesungsname, die erreichte Note sowie die Matrikelnummer des Studierenden, der die Prüfung abgelegt hat, als Fremdschlüssel. Der Key des Key-Value-Paares sollte dabei immer dem Primärschlüssel des Values, in unserem Fall also der Prüfungs-ID, entsprechen.
Nun können wir in der KSQL-Shell nachschauen, ob alles korrekt funktioniert hat, in dem wir uns zunächst alle Topics ausgeben lassen: show topics;
Wir haben damit (endlich) eine schöne Möglichkeit, die in Kafka enthaltenen Daten zu erkunden. Taucht dort nämlich das Topic mit dem zuvor gewählten Namen exam auf, können wir uns auch seinen Inhalt folgendermaßen anzeigen lassen: print 'exams' from beginning;
Doch... die Abfrage scheint hängen zu bleiben..? DON'T PANIC, it's not a bug, it's a feature. Da Topics im Real-Betrieb normalerweise fortlaufend mit Daten gefüttert werden, sind zunächst alle Queries in KSQL sogenannte Continuous Queries, die so lange laufen und neu in das Topic kommende Daten ausgeben, bis sie abgebrochen werden. Das geht wie auf der Kommandozeile üblich mit Control + C.
Nun haben wir alle Vorbereitungen getroffen, um in KSQL wie folgt eine Tabelle anlegen zu können:
create table exams (id varchar, name varchar, note double, matrikelnummer varchar) with (kafka_topic = 'exams', value_format='json', key = 'id');
Der erste Teil ist dabei (fast) normaler SQL-Code, im zweiten Teil werden einige Metadaten übergeben, die KSQL mitteilen, mit welchem Topic die Tabelle verbunden werden soll.
(K)SQL-Abfragen
Wie von SQL gewohnt können wir uns nun das Schema über describe exams; anzeigen lassen, dabei wird deutlich, dass Kafka automatisch zwei weitere Spalten zu unseren Daten hinzugefügt hat, nämlich die ROWTIME und den ROWKEY. Erstere ist der Timestamp, wann Kafka den Datensatz empfangen hat, zweitere ist der Key des jeweiligen Key-Value-Paars aus dem Kafka-Topic, der mit dem Inhalt der Key-Spalte, die in der With-Klausel genannt ist identisch sein muss. Ferner können wir uns natürlich auch den Inhalt der Tabelle mit einer normalen Select-Abfrage ausgeben lassen:
select * from exams;
Doch Moment, wir sehen, dass wir außer einer "hängenden Query", die sich mit Ctrl-C beenden lässt, wieder nichts sehen...? Das liegt daran, dass Kafka per default so konfiguriert ist, dass es nur neu ankommenden Einträge in einem Topic auswertet (daher zuvor auch das "from beginning"). Wir könnten also a) neue Einträge an das Topic schicken oder b), wenn wir alle vorhandenen Einträge in der Select-Abfrage angezeigt bekommen möchten, müssen wir vor der nächsten Anfrage folgendes Kommando auf der KSQL-Shell absenden (und das nach jedem Neustart der Shell):
SET 'auto.offset.reset' = 'earliest';
Nun sollte unsere Select-Abfrage problemlos funktionieren, daher können wir auch gleich noch eine Where-Klausel an sie anhängen:
select * from exams where name = 'Programmierung 1';
Einer Query kann auch ein Limit mitgegeben werden, so dass sie automatisch endet, wenn die gesetzte Anzahl von Ergebnissen ausgegeben worden ist: select * from exams where name = 'Programmierung 1' limit 1;
Ferner sind natürlich auch Aggregat-Funktionen wie z.B. die Berechnung des Notendurchschnitts pro Fach möglich (auch wenn es bis dato keine avg-Funktion gibt):
Ferner sind natürlich auch Aggregat-Funktionen wie z.B. die Berechnung des Notendurchschnitts pro Fach möglich (auch wenn es bis dato keine avg-Funktion gibt):
select name, sum(note) / count(name) from exams group by name;
Dieser Aufruf ergibt folgende Ausgabe:
Big Data | 1.0
Programmierung 1 | 3.0
Programmierung 1 | 3.0
Interessant dabei ist es nun zu beobachten, was auf der nicht abgebrochenen Streaming Query passiert, wenn ein weiteres Prüfungsergebnis in das Kafka-Topic geschrieben wird. Das können wir bspw. tun, in dem wir den Konsole-Producer wie oben nur ohne Piping aufrufen und dann folgende Zeile dort hinein pasten:
19:{"id":"19","name":"Big Data","note":2.0,"matrikelnummer":"55555"}
Dadurch wird das Ergebnis der noch laufenden Query aktualisiert und ein neuer Durchschnitt für das Fach Big Data ausgeben:
Big Data | 1.5
Analog würde der neue Eintrag auch bei einer einfachen Select-Query angehängt werden. Übrigens unterstützt es Kafka auch direkt, den Timestamp von Einträgen für Menschen lesbar zu formatieren: select TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss.SSS') from exams;
Tables vs. Streams
Neben Tables unterstützt KSQL auch sogenannte Streams, die ebenfalls per SQL durchsucht werden können. Während Tabellen immer den aktuellen Stand der Datenlage, also quasi einen Snapshot, ausgeben, speichern und zeigen Streams immer alle aufgetretenen Events. Schauen wir uns als Beispiel die Änderung einer Note an, die wir über den Konsole-Producer einspielen können:
19:{"id":"19","name":"Big Data","note":1.7,"matrikelnummer":"55555"}
Eine Select-Abfrage auf der Tabelle liefert für diesen Key genau den zuletzt eingetragenen Stand der Note, bzw. natürlich auch den aller enthaltenen Spalten:
select * from exams where id='19';
--> 1535127212852 | 19 | 19 | Big Data | 1.0 | 55555
Wie sieht das im Gegensatz dazu bei einem Stream aus? Auch diesen müssen wir zunächst einmal anlegen:
create stream str_exams (id varchar, name varchar, note double, matrikelnummer varchar) with (kafka_topic = 'exams', value_format='json', key = 'id');
Danach können wir die entsprechende Abfrage losschicken:
select * from str_exams where id='19';
--> 1535126428549 | 19 | 19 | Big Data | 2.0 | 55555
1535127660012 | 19 | 19 | Big Data | 1.7 | 55555
1535127660012 | 19 | 19 | Big Data | 1.7 | 55555
... und erhalten hier die gesamte Stream-Historie unseres Datensatzes. Ein offensichtlicher Usecase im vorliegenden Fall ist daher die Historisierung von Daten, die richtig spannende Möglichkeit liegt hier allerdings darin, dass eine Verarbeitung von Streaming-Daten mit Hilfe von SQL-Befehlen möglich wird, also kein eigenes Streaming Framework wie Flink oder Storm mehr notwendig ist. Darauf werde ich sicher bei nächster Gelegenheit noch genauer eingehen.
Sowohl Tabellen als auch Streams können übrigens wie gewohnt über drop table bzw. stream plus den Namen wieder gelöscht werden.
Sowohl Tabellen als auch Streams können übrigens wie gewohnt über drop table bzw. stream plus den Namen wieder gelöscht werden.
Limitierungen
Beenden wollen wir unseren ersten Ausflug in die Welt von KSQL, allerdings mit der Frage, wie es um die Join-Fähigkeiten der aktuellen KSQL-Version bestellt ist. Grundsätzlich sind Joins zwar vorgesehen, sie funktionieren zwischen Tabellen allerdings bis dato (Stand Ende 2019) nur auf den Keys selbst (vgl. hier). Mit anderen Worten sind bisher also nur 1:1-Joins und keine 1:N- bzw. N:1-Verknüpfungen über Fremdschlüssel möglich. Bei Joins, die auf Fremdschlüsseln beruhen, wird die rechte Seite Seite immer mit null-Werten aufgefüllt. Naheliegend ist die Vermutung, dass das (noch?) nicht funktioniert, da Kafka bisher keine Indices unterstützt.
Um das näher zu illustrieren benötigen wir noch einige ausgedachte Studenten (mit Studiengängen der HS Mannheim) in einem neuem Kafka-Topic namens "students". Das geht wieder wie oben mit dem Konsole-Producer:
12345:{"vorname":"Klaus","name":"Mayer","matrikelnummer":"12345","studiengang":"UIB"}23456:{"vorname":"Katrin","name":"Müller","matrikelnummer":"23456","studiengang":"IMB"}55555:{"vorname":"Steven","name":"Müller-Mayer","matrikelnummer":"55555","studiengang":"IB"}
Dieses können wir nun ebenfalls analog zu oben in eine Tabelle wandeln:
create table studis (vorname varchar, name varchar, matrikelnummer varchar, studiengang varchar) with (kafka_topic = 'students', value_format='json', key = 'matrikelnummer');
Nun läuft allerdings folgender Join ins Leere:
select * from exams left join studis on studis.matrikelnummer = exams.matrikelnummer; -->
1535126105952 | 1 | 1 | Big Data | 1.0 | 12345 | null | null | null | null | null | null
1535126105961 | 5 | 5 | Programmierung 1 | 2.0 | 23456 | null | null | null | null | null | null
...
Um das näher zu illustrieren benötigen wir noch einige ausgedachte Studenten (mit Studiengängen der HS Mannheim) in einem neuem Kafka-Topic namens "students". Das geht wieder wie oben mit dem Konsole-Producer:
12345:{"vorname":"Klaus","name":"Mayer","matrikelnummer":"12345","studiengang":"UIB"}23456:{"vorname":"Katrin","name":"Müller","matrikelnummer":"23456","studiengang":"IMB"}55555:{"vorname":"Steven","name":"Müller-Mayer","matrikelnummer":"55555","studiengang":"IB"}
Dieses können wir nun ebenfalls analog zu oben in eine Tabelle wandeln:
create table studis (vorname varchar, name varchar, matrikelnummer varchar, studiengang varchar) with (kafka_topic = 'students', value_format='json', key = 'matrikelnummer');
Nun läuft allerdings folgender Join ins Leere:
select * from exams left join studis on studis.matrikelnummer = exams.matrikelnummer; -->
1535126105952 | 1 | 1 | Big Data | 1.0 | 12345 | null | null | null | null | null | null
1535126105961 | 5 | 5 | Programmierung 1 | 2.0 | 23456 | null | null | null | null | null | null
...
Ein entsprechernder Join einer Tabelle mit einem Stream funktioniert interessanterweise bereits, was sich wie folgt als Workaround nutzen lässt, um Fremdschlüsselbeziehungen abzubilden:
select * from str_exams left join studis on str_exams.matrikelnummer = studis.matrikelnummer; -->
1535316616713 | 12345 | 1 | Big Data | 1.0 | 12345 | 1535316139962 | 12345 | Klaus | Mayer | 12345 | UIB
1535316616723 | 23456 | 5 | Programmierung 1 | 2.0 | 23456 | 1535316139970 | 23456 | Katrin | Mueller | 23456 | IMB
1535316616723 | 12345 | 17 | Programmierung 1 | 4.0 | 12345 | 1535316139962 | 12345 | Klaus | Mayer | 12345 | UIB
1535316616713 | 12345 | 1 | Big Data | 1.0 | 12345 | 1535316139962 | 12345 | Klaus | Mayer | 12345 | UIB
1535316616723 | 23456 | 5 | Programmierung 1 | 2.0 | 23456 | 1535316139970 | 23456 | Katrin | Mueller | 23456 | IMB
1535316616723 | 12345 | 17 | Programmierung 1 | 4.0 | 12345 | 1535316139962 | 12345 | Klaus | Mayer | 12345 | UIB
...
Vorausgesetzt, wir haben in unserem Stream keine Wertänderungen auf ein und dem selben Key, die dann natürlich mehrfach gejoint werden würden. Eine weitere Voraussetzung ist, dass beide Datensätze nach der "Join-Spalte" partitioniert sind (sog. Co-Partitioning, d.h., dass die zu joinenden Elemente auf dem gleichen Broker liegen). Das ist üblicherweise auf der Fremdschlüsselseite nicht der Fall, kann aber ggf. mit einem "partition by" erzwungen werden, vgl. hier.
Ein abschließendes Wort der Warnung soll diesen Post beschließen, Kafka ist per default "vergesslich" und löscht Daten nach einer Woche, wenn die Größe eines Topics 1 GB übersteigt. Wer hier tiefer einsteigen möchte, sollte sich die beiden Parameter log.retention.hours und log.retention.bytes genauer betrachten und sie ggf. auf -1 setzen, um das Löschen zu verhindern.
Ein abschließendes Wort der Warnung soll diesen Post beschließen, Kafka ist per default "vergesslich" und löscht Daten nach einer Woche, wenn die Größe eines Topics 1 GB übersteigt. Wer hier tiefer einsteigen möchte, sollte sich die beiden Parameter log.retention.hours und log.retention.bytes genauer betrachten und sie ggf. auf -1 setzen, um das Löschen zu verhindern.
Weiterführend
- Fortsetzung dieses Posts mit einem tieferen Einstieg in die Stream-Verarbeitung
- einführendes Video bei Confluent
- Anwendung von KSQL & Grafana zur Visualisierung von Fahrbewegungen
- Hier noch ein Beispiel wie Tweets ausgewertet werden können