Vor einigen Tagen hatte ich über die Grundlagen von KSQL als
Schnittstelle zur Stream-Verarbeitung mit Kafka geschrieben. Die
Beispiele dort sind zum Einstieg sehr stark an eine "klassische"
relationale Sicht angelehnt. Eigentliches Ziel von und Hauptargument für
KSQL ist aber sicherlich die Verarbeitung von "endlosen" Datenströmen
über eine erweiterte SQL-Syntax. Und weil diese Möglichkeiten so schön
und spannend sind, möchte ich darauf in diesem Post näher eingehen.
Hintergründe
Ein erster einfacher Use Case für die Stream-Verarbeitung mit KSQL
könnte bspw. das Filtern von Daten in einem Kafka-Topic sein. Betrachten
wir uns dazu noch einmal das Beispiel mit den Studierenden und ihren
Prüfungsergebnissen aus meinem einleitenden Blog-Post zum Thema.
Einträge von Studierenden bestehen aus Vorname, Name, Matrikelnummer und Studiengang(skürzel) und können direkt als JSON über den sog. Konsole-Producer (Details zum Aufruf - na wo wohl - im Vorgänger-Post) in Kafka eingespielt werden:
55555:{"vorname":"Steven","name":"Müller-Mayer","matrikelnummer":"55555","studiengang":"IB"}
Zur Erinnerung, Daten werden als Key-Value-Paare in Kafka-Topics gestellt, der Topic-Key sollte zur Weiterverarbeitung als Stream oder Tabelle dem Primärschlüssel entsprechen.
Mit Prüfungsdaten funktioniert das analog, die Matrikelnummer ist hier der Fremdschlüssel, um den entsprechenden Studierenden auflösen zu können:
1:{"id":"1","name":"Big Data","note":1.0,"matrikelnummer":"12345"}
Wie im vorherigen Post gezeigt, legen wir für die Stammdaten der Studierenden ganz klassisch eine Tabelle an. Die (für dieses Beispiel) quasi fortlaufend hereinkommenden Prüfungsergebnisse definieren wir in KSQL als einen Stream von Events.
Einträge von Studierenden bestehen aus Vorname, Name, Matrikelnummer und Studiengang(skürzel) und können direkt als JSON über den sog. Konsole-Producer (Details zum Aufruf - na wo wohl - im Vorgänger-Post) in Kafka eingespielt werden:
55555:{"vorname":"Steven","name":"Müller-Mayer","matrikelnummer":"55555","studiengang":"IB"}
Zur Erinnerung, Daten werden als Key-Value-Paare in Kafka-Topics gestellt, der Topic-Key sollte zur Weiterverarbeitung als Stream oder Tabelle dem Primärschlüssel entsprechen.
Mit Prüfungsdaten funktioniert das analog, die Matrikelnummer ist hier der Fremdschlüssel, um den entsprechenden Studierenden auflösen zu können:
1:{"id":"1","name":"Big Data","note":1.0,"matrikelnummer":"12345"}
Wie im vorherigen Post gezeigt, legen wir für die Stammdaten der Studierenden ganz klassisch eine Tabelle an. Die (für dieses Beispiel) quasi fortlaufend hereinkommenden Prüfungsergebnisse definieren wir in KSQL als einen Stream von Events.
Einfache Stream-Verarbeitung
Betrachten wir uns zum Einstieg in die Verarbeitung von Streams zunächst
nur eine Select-Anfrage, die die beste Note für eine bestimmte Prüfung
ermittelt, das könnte beispielsweise hilfreich sein, um eine
Notenübersicht mit Anzahl der jeweils benötigten Versuche für das
Zeugnis zu generieren (ich habe dafür noch zwei weitere
Prüfungsergebnisse in Kafka eingespielt):
select id, min(note), name, matrikelnummer, count(note) from str_exams where matrikelnummer = '12345' group by id, matrikelnummer, name;
-->
1 | 1.0 | BDEA | 12345 | 1
37 | 4.0 | Programmierung 2 | 12345 | 2
Eine weitere einfache Anwendung auf diesen Daten könnte nun beispielsweise sein, sehr gute Studierende bzw. zunächst Ihre Prüfungsergebnisse herauszufiltern, um ihnen etwa Tutoren-Verträge im jeweiligen Fach anbieten zu können. Dazu leiten wir aus unserem Stream einfach folgendermaßen einen neuen Stream ab:
select name, note, matrikelnummer from str_exams where note < 1.5;
Möchten wir auf das gefilterte Ergebnis wieder als eigenständigen Stream zugreifen können ist das sehr einfach durch folgende fettgedruckte Ergänzung möglich:
create stream great_exams as select name, note, matrikelnummer from str_exams where note < 1.5;
Ein einfaches Select auf diesem Stream liefert uns daraufhin das bisher einzige sehr gute Prüfungsergebnis:
select * from great_exams;
--> 1535386252767 | 1 | BDEA | 1.0 | 12345
Um auch die Namen der Studierenden ausgeben zu können, ist offensichtlich ein Join mit der Tabelle der Studierendendaten erforderlich:
create stream great_exams as select str_exams.name, note, str_exams.matrikelnummer, studis.name from str_exams left join studis on str_exams.matrikelnummer = studis.matrikelnummer where note < 1.5;
select id, min(note), name, matrikelnummer, count(note) from str_exams where matrikelnummer = '12345' group by id, matrikelnummer, name;
-->
1 | 1.0 | BDEA | 12345 | 1
37 | 4.0 | Programmierung 2 | 12345 | 2
Eine weitere einfache Anwendung auf diesen Daten könnte nun beispielsweise sein, sehr gute Studierende bzw. zunächst Ihre Prüfungsergebnisse herauszufiltern, um ihnen etwa Tutoren-Verträge im jeweiligen Fach anbieten zu können. Dazu leiten wir aus unserem Stream einfach folgendermaßen einen neuen Stream ab:
select name, note, matrikelnummer from str_exams where note < 1.5;
Möchten wir auf das gefilterte Ergebnis wieder als eigenständigen Stream zugreifen können ist das sehr einfach durch folgende fettgedruckte Ergänzung möglich:
create stream great_exams as select name, note, matrikelnummer from str_exams where note < 1.5;
Ein einfaches Select auf diesem Stream liefert uns daraufhin das bisher einzige sehr gute Prüfungsergebnis:
select * from great_exams;
--> 1535386252767 | 1 | BDEA | 1.0 | 12345
Um auch die Namen der Studierenden ausgeben zu können, ist offensichtlich ein Join mit der Tabelle der Studierendendaten erforderlich:
create stream great_exams as select str_exams.name, note, str_exams.matrikelnummer, studis.name from str_exams left join studis on str_exams.matrikelnummer = studis.matrikelnummer where note < 1.5;
Löschen von Streams
Da der zuerst angelegte Stream wieder gelöscht werden muss, bevor die
Variante mit dem Join unter gleichem Namen angelegt werden kann, an
dieser Stelle eine kurze Zwischenbemerkung zum Löschen von Streams. Es
funktioniert wie gewohnt per drop stream great_exams; allerdings muss
zuvor noch die laufende Streaming-Query wie folgt terminiert werden:
terminate CSAS_GREAT_EXAMS;
Vergisst man das Terminieren vor dem Drop beschwert sich KSQL entsprechend, liefert praktischerweise den Namen der Query in der Fehlermeldung direkt mit, so dass dieser einfach kopiert werden kann.
Ein weitere Besonderheit zeigt sich allerdings noch nach einer Select-Abfrage auf dem gejointen Stream, hier taucht nach dem neu Anlegen sonderbarerweise ein Ergebnis mit null-Einträgen auf:
select * from great_exams;
-->
1535386252767 | 1 | null | 1.0 | null | null
1535386252767 | 12345 | BDEA | 1.0 | 12345 | Mayer
Das liegt schlicht daran, dass KSQL für jeden neu angelegten Stream ein Topic zur Datenspeicherung benötigt, dieses Topic automatisch nach dem Stream-Namen benennt und es beim Löschen des Streams nicht mitlöscht. Folglich bleibt in diesem Fall das "ungejointe" Ergebnis aus der ersten Select-Abfrage im Topic liegen und führt nun zu den Null-Einträgen.
Das Löschen von Topics funktioniert in der aktuellen Version 5.0.0 der Confluent-Plattform noch nicht über KSQL, das geht bis dato nur mit dem aktuellen Preview-Release durch Ergänzen von delete topic nach dem Drop (vgl. hier) oder direkt über Kafka. Alternativ lässt sich der Name des Topics über den Parameter kafka_topic beim Erstellen eines Streams bzw. einer Tabelle auch selbst festlegen.
terminate CSAS_GREAT_EXAMS;
Vergisst man das Terminieren vor dem Drop beschwert sich KSQL entsprechend, liefert praktischerweise den Namen der Query in der Fehlermeldung direkt mit, so dass dieser einfach kopiert werden kann.
Ein weitere Besonderheit zeigt sich allerdings noch nach einer Select-Abfrage auf dem gejointen Stream, hier taucht nach dem neu Anlegen sonderbarerweise ein Ergebnis mit null-Einträgen auf:
select * from great_exams;
-->
1535386252767 | 1 | null | 1.0 | null | null
1535386252767 | 12345 | BDEA | 1.0 | 12345 | Mayer
Das liegt schlicht daran, dass KSQL für jeden neu angelegten Stream ein Topic zur Datenspeicherung benötigt, dieses Topic automatisch nach dem Stream-Namen benennt und es beim Löschen des Streams nicht mitlöscht. Folglich bleibt in diesem Fall das "ungejointe" Ergebnis aus der ersten Select-Abfrage im Topic liegen und führt nun zu den Null-Einträgen.
Das Löschen von Topics funktioniert in der aktuellen Version 5.0.0 der Confluent-Plattform noch nicht über KSQL, das geht bis dato nur mit dem aktuellen Preview-Release durch Ergänzen von delete topic nach dem Drop (vgl. hier) oder direkt über Kafka. Alternativ lässt sich der Name des Topics über den Parameter kafka_topic beim Erstellen eines Streams bzw. einer Tabelle auch selbst festlegen.
Weitere Abfragen
Mit einem Join ist nun bspw. auch eine Filterung der Prüfungsergebnisse nach Studiengang leicht machbar:
create stream imb_exams as select * from str_exams left join studis on str_exams.matrikelnummer = studis.matrikelnummer where studis.studiengang = 'IMB';
Bisher haben wir aus existierenden Streams abermals Streams abgeleitet, da wir den Input-Stream nur gefiltert haben. Aggregieren wir Daten auf einem Stream, wird die Ergebnissemantik eine andere. Dann interessieren uns nicht mehr einzelne Events, sondern wir sind an der Zusammenfassung mehrerer Ereignisse interessiert. Diese werden in KSQL in eine Tabelle persistiert.
Wie das folgende Beispiel verdeutlicht, ist auch das nicht weiter schwierig: angenommen wir möchten alle Studierenden, die eine Prüfung zum dritten Mal nicht bestanden haben, als endgültig durchgefallen markieren, können wir sie folgendermaßen in einer Tabelle speichern (und auf dieser wieder beliebige SQL-Abfragen ausführen):
create table finally_failed as select matrikelnummer, name, count(*) from str_exams group by matrikelnummer, name having min(note) = 5.0 and count(*) > 2;
create stream imb_exams as select * from str_exams left join studis on str_exams.matrikelnummer = studis.matrikelnummer where studis.studiengang = 'IMB';
Bisher haben wir aus existierenden Streams abermals Streams abgeleitet, da wir den Input-Stream nur gefiltert haben. Aggregieren wir Daten auf einem Stream, wird die Ergebnissemantik eine andere. Dann interessieren uns nicht mehr einzelne Events, sondern wir sind an der Zusammenfassung mehrerer Ereignisse interessiert. Diese werden in KSQL in eine Tabelle persistiert.
Wie das folgende Beispiel verdeutlicht, ist auch das nicht weiter schwierig: angenommen wir möchten alle Studierenden, die eine Prüfung zum dritten Mal nicht bestanden haben, als endgültig durchgefallen markieren, können wir sie folgendermaßen in einer Tabelle speichern (und auf dieser wieder beliebige SQL-Abfragen ausführen):
create table finally_failed as select matrikelnummer, name, count(*) from str_exams group by matrikelnummer, name having min(note) = 5.0 and count(*) > 2;
Windowing
Das war aber noch längst nicht alles, was KSQL zu bieten hat: wer sich
bereits mit der Verarbeitung von Daten-Streams beschäftigt hat, weiß,
dass es meist spannend ist, Events innerhalb von Zeitfenstern zu
analysieren. Beispielsweise wird sicherlich jedes gut genutzte
Kreditkarten-Konto über seine Lebensdauer viele Transaktionen erfahren,
aber viele Transaktionen innerhalb einer Zeitspanne von bspw. 10 min,
könnten auf Betrugsversuche hindeuten. Um entsprechende Abfragen
komfortabel ausführen zu können, muss das Stream-Verarbeitungs-Framework
die Definition von Windows unterstützen. Und natürlich bietet auch KSQL
diese Möglichkeit (vgl. z.B. dieses Video ab Minute 1:50).
Betrachten wir als Beispiel die im Hochschul-Alltag zugegebenermaßen etwas konstruierte Fragestellung, ob es Dozenten/Bearbeiter gibt, die Noten einer Prüfung innerhalb eines kurzen Zeitrahmens häufig ändern. In diesem Fall möchten wir in einem neuen Stream darüber informiert werden. Die eigentliche SQL-Abfrage dazu gestaltet sich relativ einfach:
select id, name, matrikelnummer from str_exams group by id, name, matrikelnummer having count(*) > 2;
Um die entsprechende Eingrenzung auf einen Zeitraum von bspw. einer Minute zu erreichen, definieren wir nun noch ein entsprechendes Zeitfenster:
select id, name, matrikelnummer from str_exams window tumbling (size 1 minute) group by id, name, matrikelnummer having count(*) > 2;
Auch eine Tabelle lässt sich davon wieder problemlos ableiten:
create table changes as select id, name, matrikelnummer from str_exams window tumbling (size 1 minute) group by id, name, matrikelnummer having count(*) > 2;
KSQL bietet drei Arten von Fenstern an, das bereits gezeigte Tumbling Window, das nicht überlappende Abschnitte des definierten Zeitraums umfasst, das sog. Hopping Window, das die Fenster immer eine bestimmte Zeitspanne überlappen lässt oder sog. Session Windows, die alle Events zusammenfassen, die nicht durch eine Pause bestimmter Länge unterbrochen werden. Für letzteres lässt sich obige Anfrage z.B. wie folgt formulieren:
select id, name, matrikelnummer from str_exams window session (1 minute) group by id, name, matrikelnummer having count(*) > 2;
Betrachten wir als Beispiel die im Hochschul-Alltag zugegebenermaßen etwas konstruierte Fragestellung, ob es Dozenten/Bearbeiter gibt, die Noten einer Prüfung innerhalb eines kurzen Zeitrahmens häufig ändern. In diesem Fall möchten wir in einem neuen Stream darüber informiert werden. Die eigentliche SQL-Abfrage dazu gestaltet sich relativ einfach:
select id, name, matrikelnummer from str_exams group by id, name, matrikelnummer having count(*) > 2;
Um die entsprechende Eingrenzung auf einen Zeitraum von bspw. einer Minute zu erreichen, definieren wir nun noch ein entsprechendes Zeitfenster:
select id, name, matrikelnummer from str_exams window tumbling (size 1 minute) group by id, name, matrikelnummer having count(*) > 2;
Auch eine Tabelle lässt sich davon wieder problemlos ableiten:
create table changes as select id, name, matrikelnummer from str_exams window tumbling (size 1 minute) group by id, name, matrikelnummer having count(*) > 2;
KSQL bietet drei Arten von Fenstern an, das bereits gezeigte Tumbling Window, das nicht überlappende Abschnitte des definierten Zeitraums umfasst, das sog. Hopping Window, das die Fenster immer eine bestimmte Zeitspanne überlappen lässt oder sog. Session Windows, die alle Events zusammenfassen, die nicht durch eine Pause bestimmter Länge unterbrochen werden. Für letzteres lässt sich obige Anfrage z.B. wie folgt formulieren:
select id, name, matrikelnummer from str_exams window session (1 minute) group by id, name, matrikelnummer having count(*) > 2;
Noch ein Wort zu Timestamps
Abschließend noch einige Gedanken zu Timestamps in einer verteilten
Big-Data-Architektur. Hier kann es natürlich passieren, dass Daten Kafka
erst mit einer gewissen Verzögerung erreichen, der Kafka-Timestamp also
nicht geeignet für die weitere Analyse ist. Per Default verwendet Kafka
eine sog. Retention Time von einem Tag innerhalb derer Fenster noch mit
später kommenden Daten aktualisiert werden. Dazu ist üblicherweise die
Verwendung der sog. Event Time, also eines Zeitstempels, der im
Ursprungssystem generiert wird, notwendig. Das funktioniert schlicht
über die Angabe des Parameters TIMESTAMP='spaltenname' beim Anlegen
eines Streams, wobei der genannte Spaltenname einen Zeitstempel in den
Quelldaten referenzieren muss. Den Rest erledigt Kafka automatisch.