Kafka ist eine verteilte Streaming-Plattform die verschiedene Nutzungsszenarien unterstützt. Sie kann als Messaging- oder Speichersystem eingesetzt werden und Datenströme transformieren. Dieser Artikel beschreibt, wie Sie mit Kafka-Connect Drittsysteme wie Redis an Kafka anbinden können.
Kafka bietet eine flexible Grundlage, um darauf komplexe Datenverarbeitungsarchitekturen zu bauen. Eine wichtige Frage bei diesen Architekturen ist, wie die Daten in die Plattform gelangen und wie sie diese wieder verlassen. Hier kommt Kafka-Connect ins Spiel, ebenfalls eine quelloffene Komponente von Kafka. Kafka-Connect ist ein Framework mit dem man externe Systeme, wie zum Beispiel Datenbanken, Key-Value-Speicher, Suchindizes oder Dateisysteme an Kafka anschließen kann. Für Kafka-Connect gibt es bereits zahlreiche vorgefertigte Konnektoren, wie zum Beispiel für Splunk oder Elasticsearch1. Diese Konnektoren teilen sich in zwei Gruppen auf: Sources und Sinks -also Quellen und Senken. Quellkonnektoren können zum Beispiel ganze Datenbanken auslesen und Änderungen in der Datenbank dann in Kafka-Topics veröffentlichen. Ein anderer Anwendungsfall ist das Sammeln von Metriken von Microservices, um sie dann dem Stream-Processing mit geringer Latenz zur Verfügung zu stellen und sie dann auswerten zu können. Sinks bringen die Daten aus den Kafka-Topics dann zum Beispiel in Suchindizes wie Elasticsearch oder in Batch-Systeme wie Hadoop, zur späteren Offline-Verarbeitung.
Nun könnte man meinen, das alles kann auch ohne ein zusätzliches Framework realisiert werden, indem man über das API von Kafka die Daten nach Kafka schreibt oder daraus liest. Worin liegen also die Vorteile von Kafka-Connect? Kafka-Connect stellt eine API bereit, die es ermöglicht, dass ein Konnektor nach einem Fehler an der Stelle fortfährt, an der er unterbrochen worden ist. Dabei wird der Konnektor automatisch neu gestartet und gegebenenfalls auch auf einen neuen Knoten innerhalb des Kafka-Clusters gestartet. Zusätzlich macht es die API einfacher mehrere Worker parallel zu starten, die dasselbe Quell- bzw. Zielsystem versorgen.
Auch wenn bereits viele verschiedene Konnektoren verfügbar sind, kann es dennoch vorkommen, dass für ein Legacy-System oder eine Eigenentwicklung kein Konnektor zur Verfügung steht oder dass die benötige Funktionalität nicht angeboten wird. Das stellt jedoch kein Hindernis für den Einsatz von Kafka und Kafka-Connect dar. Das Implementieren eines eigenen Konnektors ist relativ einfach und ermöglicht es praktisch jedes System mit Kafka zu verbinden. Wie bereits erwähnt unterscheidet Kafka-Connect zwischen Quellen und Senken, je nachdem ob Daten nach Kafka hinein oder hinaus bewegt werden. Es gibt also zwei verschiedene Interfaces, die je nach Anwendungsfall implementiert werden müssen. Um eine Quelle anzuschließen müssen
org.apache.kafka.connect.source.SourceConnector und org.apache.kafka.connect.source.SourceTask
implementiert werden. Für eine Senke sind es SinkConnector und SinkTask. Ein JDBCSourceConnector würde zum Beispiel die Daten einer JDBC-kompatiblen Datenbank nach Kafka importieren und ein HDFSSinkConnector würde den Inhalt eines Kafka-Topics nach HDFS exportieren. Dabei ist zu beachten, dass der Konnektor selbst nicht für das Kopieren der Daten verantwortlich ist. Er erhält beim Starten von Kafka-Connect seine Konfiguration und leitet daraus eine Konfiguration für die Tasks ab. Die Konfiguration des Konnektors enthält zum Beispiel den Dateipfad oder die JDBC-URL und wie viele Tasks erzeugt werden sollen. Bei einem Dateikonnektor könnte zum Beispiel konfiguriert werden, dass es zwei Tasks geben darf. Der Konnektor wäre dann dafür verantwortlich, nicht nur den Dateipfad an den Task zu übergeben, sondern auch welcher Bereich aus der Datei gelesen werden soll, so dass es zwei Tasks geben kann, die sich nicht gegenseitig beeinflussen. Diese Tasks werden dann von Kafka-Connect-Workern verarbeitet2. Auch bei diesen Tasks wird wieder zwischen Quelle und Senke unterschieden. Der Konnektor kann bei Bedarf auf Änderungen des externen Systems reagieren und den Task geeignet rekonfigurieren.
Schemabild wie ein Quellkonnektor mit Kafka zusammenarbeitet (Abb. 1)
Hat der Task nun die Konfiguration eines Konnektors bekommen, ist dieser dafür verantwortlich die Daten in das konfigurierte Kafka-Topic zu schreiben oder aus einem Topic zu lesen. Dabei müssen die Daten ebenfalls in verschiedene Partitionen aufgeteilt werden, analog dazu, wie Kafka Topics strukturiert. Innerhalb jeder Partition sind die Daten wieder eine geordnete Sequenz mit entsprechenden Offsets. Diese Aufteilung zu finden kann je nach Anwendung sehr einfach oder aber aufwändig sein. Wenn zum Beispiel Log-Files (Abb. 1) gelesen werden, kann jede einzelne Datei einer Partition entsprechen, jede Zeile ist ein Eintrag und der Offset ist einfach die Position innerhalb der Datei. Bei einem JDBC-Konnektor kann es da schon schwieriger werden. Eine Möglichkeit wäre, jede Tabelle auf ein Topic abzubilden. Innerhalb des Topics kann dann der Zeitstempel der letzten Abfrage genutzt werden, um eine Reihenfolge zu definieren.
Redis-Konnektor
Wie wird nun also ein Konnektor konkret implementiert? Als Quellsystem soll ein Redis-Cache dienen, der über das Pub/Sub-System angeschlossen wird. Das ermöglicht es zu zeigen, welche Schnittstellen für Kafka-Connect implementiert werden müssen und welche Funktionen dadurch nutzbar werden, ohne dass die Komplexität zu groß wird. Damit sich der Task mit Redis verbinden kann, benötigen wir Informationen wie Redis erreichbar ist. Diese Informationen werden von Kafka-Connect an den Redis-Konnektor weitergegeben (Listing 1).
Es sind also folgende Eigenschaften notwendig: Hostname Dieses Beispiel ist stark vereinfacht, um die wesentlichen Implementierungsschritte zu zeigen. Ein Redis-Konnektor mit vollem Funktionsumfang würde nicht nur einen einzelnen Channel beobachten, sondern mit Hilfe von PUBSUB CHANNELS alle aktiven Channels von Redis abfragen und für jeden Channel dann einen Task erzeugen, der dann in sein eigenes Topic schreibt. Eines der Designziele von Kafka-Connect ist, dass die Konnektoren mit großen Mengen an Daten umgehen können, bei gleichzeitig einfacher Konfiguration.
(Listing 1)
Damit diese Konfigurationsparameter beim Anlegen eines Konnektors auch übergeben werden können, muss die start Methode geeignet überschrieben werden (Listing 2). Diese Methode bekommt eine Map aus der dann die Konfigurationseinstellungen für den Redis-Konnektor extrahiert werden.
(Listing 2)
Die generische Map wird zunächst mit Hilfe einer Konfigurationsdefinition CONFIG_DEF in eine AbstractConfig umgewandelt, anschließend werden dann die einzelnen Konfigurationsparameter extrahiert. Die CONFIG_DEF kann im veröffentlichten Quellcode3 eingesehen werden. Diese Konfigurationsdefinition wird zum Beispiel von dem Confluent-Control-Center verwendet, um eine Konfigurationsoberfläche für den Konnektor anzuzeigen.
Redis-Task
Damit Kafka-Connect nun einen Source-Task erstellen kann, müssen noch zwei weitere Methoden überschrieben werden. Zum einen muss mitgeteilt werden, welche Klasse den Task für diesen Konnektor übernimmt und wie dieser Task konfiguriert werden muss. Man erkennt, dass in dem RedisSourceConnector in erster Linie die Konfiguration für die Tasks aufbereitet wird, um dann zu entscheiden wie viele Tasks erzeugt werden sollen. Um den Konnektor robuster zu machen, könnte man hier eine Validierung einbauen und prüfen ob mit den übergebenen Parametern wie HOST und PORT Redis überhaupt erreichbar ist.
Konkret müssen die Methoden taskClass und taskConfigs überschrieben werden. In taskClass wird die Klasse definiert, die den Task dann tatsächlich ausführt, also RedisSourceTask. In taskConfigs muss die Konfiguration für die Tasks erstellt werden (Listing 3). Je nach Konnektor kann dies mehr oder weniger aufwendig sein. In diesem einfachen Beispiel wird die Konfiguration 1:1 an den Task weitergegeben. Das bedeutet auch, dass genau ein Task erzeugt wird. Falls der Anwendungsfall es zulässt, dass mehr als ein Task erzeugt werden kann muss darauf geachtet werden, dass nicht mehr als im Parameter von taskConfigs angegeben erzeugt werden.
(Listing 3)
Damit ist der RedisSourceConnector fertig implementiert. Der zugehörige RedisSourceTask implementiert wie die Daten kopiert werden. Doch betrachten wir zunächst, wie Kafka-Connect mit diesem Task interagiert. Das SourceTask Interface definiert die poll Methode, die von den Worker-Prozessen von Kafka-Connect wiederholt aufgerufen wird, um eine Liste von Einträgen abzurufen und diese dann in das entsprechende Topic zu schreiben. Dabei ist zu beachten, dass Kafka-Connect für jeden Task einen dedizierten Thread zur Verfügung stellt. Das bedeutet die Implementierung von poll darf blockieren. Das ermöglicht es zunächst mit einer einfachen Implementierung zu starten und diese dann später zu optimieren. Bevor nun Kafka-Connect poll aufrufen kann, muss der Task gestartet werden. Dies geschieht indem die start Methode aufgerufen wird (Listing 4). Dort wird nun zunächst eine Verbindung zum Redis-Server aufgebaut (1), um dann den RedisSourceTask als Listener bei dem Pub/Sub-System von Redis zu registrieren (2).
(Listing 4)
Danach wird der konfigurierte Kanal abonniert (3) und eine Deque (4) initialisiert (Listing 4). Eine Deque ist eine doppelseitige Warteschlange, mit der eine Verbindung zwischen der poll Methode und dem Event-Listener hergestellt werden kann.
(Listing 5)
Da der RedisSourceTask als Listener bei dem Redis-Client registriert wurde, muss nun auch das entsprechende Interface implementiert werden (Listing 5). Relevant ist dabei die message Methode, die für das Verarbeiten von Nachrichten verantwortlich ist, die aus dem Pub/Sub-System von Redis empfangen werden. Hier wird ein SourceRecord erstellt (1) und dann mit add an das Ende der Warteschlange angehängt (2). Der SourceRecord ist ein von Kafka-Connect zur Verfügung gestellter Datentyp, der einen Eintrag in ein Kafka-Topic beschreibt. Dieser enthält neben den Daten auch das Topic, in das die Daten geschrieben werden sollen und Informationen woher diese Nachricht kam. Die sourcePartition beschreibt in diesem Fall den Ursprung der Daten – hier der Kanal, der belauscht wird. Mit dem sourceOffset kann angegeben werden, bis zu welcher Stelle man bereits von dem Quellsystem gelesen hat, um bei einem Neustart des Tasks wieder an dieser Stelle fortfahren zu können. Bei dem Redis-Task ist dieser Wert nicht gesetzt, da beim Pub/Sub-System die Nachrichten nicht zwischengespeichert werden, sondern nur an alle Empfänger gesendet werden, die im Moment des Sendens verbunden sind. Wenn eine Datei eingelesen wird, kann zum Beispiel die Zeilennummer als Offset verwendet werden, die gerade gelesen worden ist. Zusätzlich kann noch ein Schema mit übergeben werden, also eine Beschreibung welches Format die Nachricht hat. Dadurch ist es möglich, verschiedene Nachrichten in verschiedenen Formaten an Kafka zu senden und beim Lesen wieder anhand des Schemas zu rekonstruieren.
(Listing 6)
Die so erzeugten Einträge in der Warteschlage werden nun durch wiederholtes Aufrufen der poll Methode von den Kafka-Connect Workern angeholt und in das angegebene Topic geschrieben. Dabei wird die Warteschlange abgefragt bis sie leer ist und dann an Kafka-Connect zurückgegeben. Falls ein Quell-System angeschlossen wird, das ein Quittieren der gelesenen Nachrichten unterstützt, beispielsweise eine Message-Queue, kann dies in den commit oder commitrecord Methoden realisiert werden.
Paketierung des Konnektors
Damit ist die Implementierung eines einfachen Quellkonnektors vollständig. Um diesen Konnektor nun in Kafka-Connect zu benutzen, muss ein Archiv erstellt werden, das sowohl die Implementierung enthält als auch alle relevanten Third-Party-Bibliotheken. Dies kann mit dem Maven-Assembly-Plugin4 erreicht werden (Listing 7). Dazu packt man das Artefakt (1) inklusive seiner Runtime-Dependencies (2) in ein eindeutig benanntes Zip-Archiv.
(Listing 7)
Das so erzeugte Archiv enthält dann alles notwendige, um den Konnektor zu distribuieren. Die Installation in Kafka-Connect gestaltet sich dann auch sehr einfach. Das Archiv wird entpackt und das neu entstandene Verzeichnis wird in den Plug-in-Pfad von Kafka-Connect kopiert. Beim nächsten Starten steht der Konnektor dann zur Verfügung.
Fazit:
Um große Datenmengen einfach nach Kafka oder aus Kafka heraus zu bewegen, kann man entweder die Kafka-API direkt benutzen oder auf Kafka-Connect zurückgreifen. Kafka-Connect bietet mit seiner API einen wesentlich einfacheren Weg, um mit Kafka zu interagieren und denkt dabei auch schon an Ausfallsicherheit, Fehlertoleranz und Performance. Die API ist leicht verständlich und ermöglicht es sehr schnell Konnektoren für beliebige Drittsysteme zu entwickeln.
Nicolai Mainiero ist Diplom-Informatiker und arbeitet als Software Developer bei der Firma sidion. Er entwickelt seit über zwölf Jahren Geschäftsanwendungen in Java, Kotlin und PHP für unterschiedlichste Kundenprojekte. Dabei setzt er vor allem auf agile Methoden wie Kanban. Außerdem interessiert er sich für funktionale Programmierung, Microservices und reaktive Anwendungen.