Signal per SSE, Daten per REST – eine Vaadin-Demonstration in Core Java

Sven Ruppert

1. Einleitung

1.1 Motivation: Ereignisgetriebene Aktualisierung ohne Polling

In klassischen Webanwendungen dominiert nach wie vor das Pull-Prinzip: Clients stellen wiederholt Anfragen an den Server, um Veränderungen zu erkennen. Dieses Polling ist einfach, führt aber zu unnötiger Last auf Server- und Netzwerkseite, insbesondere wenn sich der Datenbestand nur sporadisch ändert. Mit Server-Sent Events (SSE) steht ein standardisiertes Verfahren zur Verfügung, mit dem der Server Änderungen aktiv an verbundene Clients signalisieren kann. Dadurch lassen sich unnötige Anfragen vermeiden, während Aktualisierungen zeitnah an die Oberfläche gelangen.

Den Quellcode findest Du auf gitHub unter

https://github.com/Java-Publications/Blog—Vaadin—How-to-consume-SSE-from-a-REST-Service

https://github.com/Java-Publications/Blog—Vaadin—How-to-consume-SSE-from-a-REST-Service

1.2 Zielsetzung und Abgrenzung

Dieser Beitrag verfolgt das Ziel, die grundlegende Funktionsweise von SSE im Zusammenspiel mit einer Vaadin-Flow-Anwendung zu demonstrieren. Der Schwerpunkt liegt dabei auf der Trennung von Signalisierung und Datenabruf: Während SSE ausschließlich für Benachrichtigungen eingesetzt wird, erfolgt der eigentliche Abruf der neuen Daten über REST-Endpunkte. Sicherheitsaspekte wie Authentisierung oder Autorisierung sowie persistente Datenhaltung werden bewusst ausgeklammert, um die Kernidee klar herauszustellen.

1.3 Überblick über das Demonstrationsszenario

Der Demonstrator besteht aus drei Komponenten:

  • REST/SSE-Server mit CLI-Eingabe: Neue Daten werden während der Laufzeit direkt über die Konsole eingegeben und in einem einfachen In-Memory-Speicher abgelegt. Jede Eingabe löst unmittelbar ein SSE-Signal aus.
  • SSE-Signal: Der Server sendet nach jeder Eingabe ein Ereignis, das den Clients mitteilt, dass neue Daten verfügbar sind.
  • Vaadin-Flow-Anwendung: Die UI registriert sich als SSE-Client, zeigt Benachrichtigungen über neue Daten an und erlaubt den Nutzenden, diese per REST-Abruf gezielt nachzuladen und darzustellen.

Mit dieser Konstellation entsteht ein klar nachvollziehbares Beispiel für die Integration von SSE in ein serverseitiges Java-UI-Framework. Die Trennung zwischen Signal und Daten erlaubt eine robuste und leicht verständliche Architektur, die sich für verschiedenste Echtzeitszenarien eignet.

2. Konzeptuelle Grundlagen

2.1 Server-Sent Events (SSE)

Server-Sent Events (SSE) sind ein standardisiertes Verfahren, um vom Server aus kontinuierlich Nachrichten an den Client zu übertragen. Die Kommunikation erfolgt unidirektional: Der Server sendet, der Client empfängt. Technisch basiert SSE auf einer persistenten HTTP-Verbindung mit dem MIME-Typ text/event-stream. Nachrichten bestehen aus einfachen Textzeilen und werden vom Browser oder einem Client-Framework interpretiert.

Die Vorteile von SSE liegen in der Einfachheit der Implementierung, automatischem Reconnect auf Client-Seite und der guten Integration in bestehende HTTP-Infrastrukturen. Einschränkungen ergeben sich durch die Beschränkung auf UTF-8-Text und die fehlende Möglichkeit bidirektionaler Kommunikation. Für reine Benachrichtigungen und Statusmeldungen ist SSE jedoch meist effizienter und robuster als komplexere Alternativen wie WebSockets oder Long Polling.

2.2 REST als Transportkanal für Nutzdaten

REST-basierte Endpunkte haben sich als Standard zur Übertragung strukturierter oder binärer Daten etabliert. Sie sind zustandslos, einfach skalierbar und profitieren von etablierter Infrastruktur wie Caching und Monitoring. Im vorliegenden Demonstrationsszenario dient REST dazu, die eigentlichen Daten bereitzustellen, die nach einer Benachrichtigung über SSE explizit vom Client angefordert werden. Diese Trennung sorgt für klare Verantwortlichkeiten: SSE signalisiert, dass sich etwas geändert hat; REST liefert die Inhalte.

2.3 Vaadin Flow: Serverseitiges UI-Modell und Push

Vaadin Flow ist ein serverseitiges Java-Framework für die Entwicklung von Webanwendungen. Die UI-Logik läuft auf dem Server, während der Browser nur die Darstellung übernimmt. Änderungen im Serverzustand werden über einen synchronisierten Kommunikationskanal an den Client übertragen. Für die Integration von SSE ist dabei besonders relevant, dass Vaadin mit Push arbeitet: Server-seitige Ereignisse können direkt in die Oberfläche übertragen werden. Damit lässt sich der Empfang von SSE-Signalen elegant mit UI-Updates verbinden, ohne dass clientseitiges JavaScript erforderlich ist.

3. Architektur des Demonstrators

3.1 Komponentenübersicht

Der Demonstrator besteht aus drei Hauptkomponenten. Die erste Komponente ist der REST/SSE-Server. Er ist dafür zuständig, einen Endpunkt für die Eingabe neuer Daten bereitzustellen, diese Daten im Speicher abzulegen und gleichzeitig über SSE Signale an die verbundenen Clients zu senden, sobald sich der Datenbestand verändert. Zusätzlich stellt er REST-Endpunkte zur Verfügung, über die die Clients die aktuellen Daten abrufen können.

Die zweite Komponente bildet die In-Process-CLI, die direkt in den Serverprozess eingebettet ist. Sie ermöglicht es, während der Laufzeit manuell neue Datenzeilen über die Konsole einzugeben. Jede Eingabe wird unmittelbar als Datensatz im In-Memory-Speicher abgelegt. Gleichzeitig löst dieser Vorgang ein Ereignis aus, das per SSE an die verbundenen Clients weitergeleitet wird.

Als dritte Komponente kommt die Vaadin-Flow-Anwendung hinzu. Sie fungiert als Client, der die über SSE gesendeten Signale entgegennimmt und die Nutzerinnen und Nutzer darüber informiert, dass neue Daten verfügbar sind. Auf Wunsch kann die Anwendung diese Daten gezielt über die bereitgestellten REST-Endpunkte nachladen und direkt in der Benutzeroberfläche darstellen. Damit wird der gesamte Ablauf von der Eingabe über die Signalisierung bis zur Sichtbarkeit im UI abgedeckt.

3.2 Kommunikationsbeziehungen

Die Architektur folgt dem Muster Signal per SSE, Daten per REST. Sobald die CLI eine neue Eingabe erhält, speichert der Server den Eintrag im Speicher und sendet ein entsprechendes SSE-Update an alle verbundenen Clients. Die Vaadin-Anwendung empfängt dieses Signal, informiert den Nutzer über die Verfügbarkeit neuer Daten und bietet eine Option zum gezielten Abruf. Über einen anschließenden REST-Request fordert die Vaadin-Anwendung die Daten an und aktualisiert die Benutzeroberfläche.

3.3 Laufzeitumgebung und Annahmen

Für die Demonstration gelten einige bewusst vereinfachte Rahmenbedingungen. Server und Vaadin-Anwendung laufen lokal auf demselben Host, jedoch in getrennten Prozessen und typischerweise auch auf unterschiedlichen Ports, beispielsweise 8080 für den Server und 8081 für die UI. Dadurch wird eine realitätsnahe Trennung der Systeme simuliert, ohne dass eine komplexe Infrastruktur erforderlich ist.

Darüber hinaus ist die CORS-Policy so eingestellt, dass Zugriffe ohne Einschränkung möglich sind. Diese offene Konfiguration dient allein der Vereinfachung der Demonstration, da Sicherheitsaspekte in diesem Szenario nicht berücksichtigt werden.

Die Speicherung der Daten erfolgt ausschließlich im Arbeitsspeicher. Jeder eingegebene Datensatz bleibt nur während der Laufzeit verfügbar und geht nach dem Beenden des Prozesses verloren. Persistenzmechanismen wie Datenbanken oder Dateisysteme werden in diesem Beispiel bewusst nicht eingesetzt, um den Fokus vollständig auf das Zusammenspiel von CLI-Eingabe, SSE-Signal und REST-Abruf zu legen.

Diese Architektur ist bewusst einfach gehalten, um die Funktionsweise von SSE in Kombination mit REST und Vaadin Flow klar und nachvollziehbar zu demonstrieren.

4. Daten- und Ereignismodell

4.1 Datensatzstruktur

Im Demonstrationsszenario werden neue Daten durch die Eingabe von Textzeilen in der CLI erzeugt. Jeder Eintrag wird im Server in Form eines einfachen Datensatzes abgelegt. Dieser besteht aus einer fortlaufenden Sequenznummer, die bei jeder Eingabe erhöht wird, einem Zeitstempel, der den Zeitpunkt der Erfassung dokumentiert, sowie dem eigentlichen Inhalt, also dem vom Operator eingegebenen Text. Es wird dabei implizit davon ausgegangen, dass sowohl der REST/SSE-Server als auch die Vaadin-Anwendung auf derselben Systemzeit basieren. Nur so lassen sich die Zeitstempel unmittelbar vergleichen, ohne dass zusätzliche Synchronisationsmechanismen erforderlich sind. Diese Struktur ist bewusst minimalistisch gehalten, um die Nachvollziehbarkeit zu gewährleisten und den Fokus auf das Zusammenspiel der Komponenten zu richten.

4.2 Ereignistypen und Semantik

Die Kommunikation zwischen Server und Client erfolgt über Ereignisse, die im SSE-Format übertragen werden. Im Zentrum steht dabei das Ereignis update, das immer dann gesendet wird, wenn ein neuer Datensatz aufgenommen wurde. Als Datenlast dieses Ereignisses wird mindestens die aktuelle Sequenznummer übermittelt, sodass der Client erkennen kann, ob für ihn neue Einträge vorliegen. Optional könnte auch ein Ereignis init zum Einsatz kommen, das beim Aufbau einer neuen Verbindung den Startzustand signalisiert. Weitere Ereignistypen wie reset oder snapshot lassen sich für erweiterte Szenarien vorsehen, sind aber für die Demonstration nicht erforderlich.

4.3 Sequenznummerierung und idempotentes Nachladen

Die fortlaufende Sequenznummer spielt eine zentrale Rolle für die Konsistenz zwischen Server und Client. Sie ermöglicht es, bei einem Abruf über REST gezielt nur die Datensätze anzufordern, die seit der letzten bekannten Nummer hinzugekommen sind. Damit können Clients auch dann korrekt synchronisiert bleiben, wenn sie ein oder mehrere SSE-Signale verpasst haben. Das Nachladen ist dadurch idempotent: Ein erneuter Abruf mit derselben since-Angabe liefert stets dieselbe Ergebnismenge, unabhängig davon, wie oft er wiederholt wird. Dieses Prinzip erleichtert die Fehlertoleranz und sorgt für eine stabile Verarbeitungskette.

5. REST/SSE-Server mit In-Process-CLI

5.1 Bedienkonzept der CLI

Der Serverprozess verfügt über eine integrierte Kommandozeilen-Schnittstelle, über die während der Laufzeit neue Datensätze eingegeben werden können. Jede Eingabe entspricht einer einzelnen Textzeile, die direkt in den In-Memory-Speicher übernommen wird. Dieses einfache Bedienkonzept erlaubt es, die Datenquelle manuell zu steuern und so gezielt Ereignisse auszulösen, die anschließend über SSE an die Clients signalisiert werden.

5.2 Append-Log und Konsistenzüberlegungen

Die Speicherung der Eingaben erfolgt in Form eines Append-Logs: Jeder neue Datensatz wird am Ende der bestehenden Liste angefügt. Die fortlaufende Sequenznummer gewährleistet, dass sich die Reihenfolge eindeutig bestimmen lässt. Durch diese einfache Struktur ist sichergestellt, dass alle Clients bei Bedarf konsistent denselben Zustand nachladen können. Komplexere Mechanismen wie Transaktionen oder Sperren sind für die Demonstration nicht erforderlich.

5.3 SSE-Broadcast: Trigger bei erfolgreicher Eingabe

Sobald ein neuer Datensatz in den Speicher aufgenommen wurde, erzeugt der Server unmittelbar ein SSE-Ereignis vom Typ update. Dieses Ereignis wird an alle verbundenen Clients ausgesendet und enthält mindestens die aktuelle Sequenznummer. Damit werden alle registrierten Empfänger in die Lage versetzt zu erkennen, dass neue Daten verfügbar sind. Der eigentliche Inhalt wird nicht übertragen, sondern bleibt dem späteren Abruf über REST vorbehalten.

5.4 Optionale Optimierungen

Für ein Demonstrationsszenario genügt es, jedes neue Ereignis sofort weiterzuleiten. In realitätsnäheren Umgebungen können jedoch Optimierungen sinnvoll sein. Dazu zählt etwa das Zusammenfassen mehrerer schneller Eingaben in einem kombinierten Update-Signal, um die Anzahl der übertragenen Nachrichten zu reduzieren. Ebenso können regelmäßige Ping-Kommentare verwendet werden, um die Verbindung zu stabilisieren und sicherzustellen, dass inaktive Clients erkannt und entfernt werden. Diese Maßnahmen erhöhen die Robustheit, sind für die funktionale Demonstration jedoch nicht zwingend notwendig. Konzepte dieser Art werden wir in einem weiteren Artikel aufgreifen, wenn wir uns mit dem Open-Source-Projekt des URL-Shorteners beschäftigen.

6. Vaadin-Flow-Integration (ohne JavaScript)

6.1 Serverseitiger SSE-Client

In der Vaadin-Flow-Anwendung wird ein serverseitiger SSE-Client betrieben, der dauerhaft mit dem SSE-Endpunkt des REST-Servers verbunden ist. Dadurch kann die Anwendung unabhängig von clientseitigem JavaScript Signale empfangen und unmittelbar auf Serverseite verarbeiten. Kommt es zu Verbindungsabbrüchen, wird automatisch ein Reconnect-Versuch gestartet, sodass die UI kontinuierlich über den aktuellen Stand informiert bleibt.

6.2 UI-Synchronisation und Push

Da Vaadin Flow nach dem Prinzip eines serverseitigen UI-Modells arbeitet, müssen externe Ereignisse in den UI-Thread integriert werden. Dies geschieht über synchronisierte Zugriffe, die sicherstellen, dass Änderungen konsistent und threadsicher ausgeführt werden. In Kombination mit Vaadin Push können so eingehende SSE-Signale direkt in sichtbare Aktualisierungen umgesetzt werden. Nutzerinnen und Nutzer bemerken die Aktualisierung unmittelbar, ohne dass ein manueller Refresh erforderlich ist. Den Sonderfall eines kompletten Page-Reloads lassen wir an dieser Stelle bewusst außer Acht.

6.3 Interaktionsmuster

Die Vaadin-Oberfläche ist so gestaltet, dass sie bei einem eingehenden SSE-Signal nicht automatisch alle neuen Daten lädt, sondern zunächst lediglich einen Hinweis anzeigt. Dies kann in Form einer Benachrichtigung oder durch die Aktivierung eines Buttons erfolgen. Erst durch eine bewusste Aktion des Nutzers werden die neuen Daten über REST abgerufen und in die Oberfläche eingebunden. Zudem kann der User auf diese Weise selektiv entscheiden, welche Detaildaten von Interesse sind. Dieses Interaktionsmuster verdeutlicht die Trennung von Signalisierung und Datenabruf und macht die Funktionsweise für Demonstrationszwecke besonders anschaulich.

6.4 Delta-Abruf und Vollabruf

Beim Nachladen der Daten über REST kann zwischen zwei Modi unterschieden werden: Ein Vollabruf lädt stets den gesamten Datenbestand, während ein Delta-Abruf nur diejenigen Einträge abruft, die seit der letzten bekannten Sequenznummer hinzugekommen sind. Steht keine Sequenznummer zur Verfügung, wird nicht der gesamte Datenbestand übertragen, sondern es werden lediglich die letzten n Nachrichten geladen. Für die Demonstration genügt diese Vorgehensweise, da sie die Nachvollziehbarkeit erleichtert. In produktionsnäheren Szenarien bietet der Delta-Abruf dennoch Vorteile in Bezug auf Effizienz und Bandbreite.

7. Implementierung – REST SSE Server

7.1 REST-Server

Der RestServer bündelt die gesamte Serverfunktionalität auf Basis von com.sun.net.httpserver.HttpServer. Er initialisiert die drei Endpunkte des Demonstrators (/sse, /data, /health) und verwaltet die für SSE benötigten Serverressourcen. Dazu zählen die Menge der aktuell verbundenen Ausgabeströme (sseClients), ein threadsicher geführtes In‑Memory‑Append‑Log der Datensätze sowie die zur Laufzeit notwendigen Ausführungsdienste (Scheduled Executor für Keep‑Alive‑Pings und ein Connection‑Executor für langlebige SSE‑Verbindungen).

Im Betrieb startet der Server einen CLI‑Thread, der Zeilen von stdin entgegennimmt und als neue Datensätze in das Log einfügt. Jeder Eintrag erhält eine monotone Sequenznummer und einen Zeitstempel; im Anschluss wird ein SSE‑Update an alle verbundenen Clients ausgesendet. Der RestServer stellt darüber hinaus Hilfsfunktionen für CORS‑Header, standardisierte Antworten, Query‑Parsing und die Ausgabe des SSE‑Formats bereit. Die Methoden since(long) und lastN(int) bilden die REST‑Lesepfade ab: Sie liefern entweder alle Einträge nach einer bestimmten Sequenznummer oder die letzten n Einträge zur initialen Synchronisierung, falls noch keine Sequenz bekannt ist.

package com.svenruppert.rest;

public class RestServer {
  public static final int DEFAULT_LAST_N = 20;
  public static final String PATH_SSE = "/sse";
  public static final String PATH_DATA = "/data";
  public static final String PATH_HEALTH = "/health";
  public static final String CONTENT_TYPE = "text/plain; charset=utf-8";
  public static final int PORT = 8080;
  public static final long PING_INTERVAL_MILLIS = 30_000L;
  
protected static final String ACCESS_CONTROL_ALLOW_ORIGIN = "Access-Control-Allow-Origin";
  
private final HttpServer http;
  private final Set<OutputStream> sseClients = ConcurrentHashMap.newKeySet();
  private final CopyOnWriteArrayList<Entry> store = new CopyOnWriteArrayList<>();
  private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
  private final ExecutorService connectionExecutor = Executors.newCachedThreadPool();
  private final AtomicLong seq = new AtomicLong(0);

  public RestServer(int port)
      throws IOException {
    http = HttpServer.create(new InetSocketAddress(port), 0);
    http.createContext(PATH_SSE, new SseHandler(this));
    http.createContext(PATH_DATA, new DataHandler(this));
    http.createContext(PATH_HEALTH, ex -> respond(ex, 200, "OK"));
    http.setExecutor(connectionExecutor);
  }

  // --- Main ---
  public static void main(String[] args)
      throws Exception {
    RestServer srv = new RestServer(PORT);
    Runtime.getRuntime().addShutdownHook(new Thread(srv::stop));
    srv.start();
  }

  public AtomicLong getSeq() {
    return seq;
  }

  public Set<OutputStream> getSseClients() {
    return sseClients;
  }

  public ScheduledExecutorService getScheduler() {
    return scheduler;
  }

  public ExecutorService getConnectionExecutor() {
    return connectionExecutor;
  }

  public void writeEvent(OutputStream os, String event, String data)
      throws IOException {
    os.write(sseFormat(event, data));
    os.flush();
  }

  public void writeComment(OutputStream os, String comment)
      throws IOException {
    os.write(("# " + comment + "\n\n").getBytes(StandardCharsets.UTF_8));
    os.flush();
  }

  public byte[] sseFormat(String event, String data) {
    String msg = "event: " + event + "\n" + "data: " + data + "\n\n";
    return msg.getBytes(StandardCharsets.UTF_8);
  }

  // --- Utils ---
  public void addCors(HttpExchange ex) { ex.getResponseHeaders().add(ACCESS_CONTROL_ALLOW_ORIGIN, "*"); }

  public void respond(HttpExchange ex, int code, String body)
      throws IOException { respond(ex, code, body, CONTENT_TYPE); }
  public void respond(HttpExchange ex, int code, String body, String contentType)
      throws IOException {
    Headers h = ex.getResponseHeaders();
    h.add("Content-Type", contentType);
    byte[] bytes = body.getBytes(StandardCharsets.UTF_8);
    ex.sendResponseHeaders(code, bytes.length);
    try (OutputStream os = ex.getResponseBody()) {
      os.write(bytes);
    }
  }

  public Map<String, List<String>> parseQuery(URI uri) {
    Map<String, List<String>> map = new LinkedHashMap<>();
    String query = uri.getRawQuery();
    if (query == null || query.isEmpty()) return map;
    for (String pair : query.split("&")) {
      int idx = pair.indexOf('=');
      String key = idx > 0 ? decode(pair.substring(0, idx)) : decode(pair);
      String val = idx > 0 && pair.length() > idx + 1 ? decode(pair.substring(idx + 1)) : "";
      map.computeIfAbsent(key, k -> new ArrayList<>()).add(val);
    }
    return map;
  }

  public String decode(String s) { return URLDecoder.decode(s, StandardCharsets.UTF_8); }
  public String first(List<String> list) { return (list == null || list.isEmpty()) ? null : list.get(0); }
  // --- Start / Stop ---
  public void start() {
    http.start();
    System.out.println("REST/SSE server running on http://localhost:" + PORT);
    // CLI-Thread für In-Process-Eingaben
    Thread cli = new Thread(this::cliLoop, "cli-loop");
    cli.setDaemon(true);
    cli.start();
  }

  public void stop() {
    try {
      http.stop(0);
    } catch (Exception ignored) {
    }
    try {
      scheduler.shutdownNow();
    } catch (Exception ignored) {
    }
    try {
      connectionExecutor.shutdownNow();
    } catch (Exception ignored) {
    }
    for (OutputStream os : sseClients) {
      try {
        os.close();
      } catch (IOException ignored) {
      }
    }
    sseClients.clear();
  }

  // --- CLI ---
  protected void cliLoop() {
    System.out.println("CLI bereit. Tippe Textzeilen ein. 'exit' beendet den Server.");
    try (BufferedReader br = new BufferedReader(new InputStreamReader(System.in, StandardCharsets.UTF_8))) {
      String line;
      while ((line = br.readLine()) != null) {
        if (line.equalsIgnoreCase("exit")) {
          System.out.println("Beende Server…");
          stop();
          break;
        }
        if (line.isBlank()) {
          System.out.println("(leere Zeile ignoriert)");
          continue;
        }
        appendNew(line);
      }
    } catch (IOException e) {
      System.err.println("CLI beendet: " + e.getMessage());
    }
  }

  private void appendNew(String text) {
    long n = seq.incrementAndGet();
    Entry e = new Entry(n, Instant.now(), text);
    store.add(e);
    System.out.println("[APPEND] " + e);
    broadcastUpdate(n);
  }

  private void broadcastUpdate(long highestSeq) {
    String payload = Long.toString(highestSeq);
    byte[] bytes = sseFormat("update", payload);
    List<OutputStream> dead = new ArrayList<>();
    for (OutputStream os : sseClients) {
      try {
        os.write(bytes);
        os.flush();
      } catch (IOException e) {
        dead.add(os);
      }
    }
    if (!dead.isEmpty()) sseClients.removeAll(dead);
  }

  public List<Entry> since(long s) {
    List<Entry> out = new ArrayList<>();
    for (Entry e : store) if (e.seq() > s) out.add(e);
    return out;
  }

  public List<Entry> lastN(int n) {
    int size = store.size();
    if (n <= 0) return List.of();
    int from = Math.max(0, size - n);
    return new ArrayList<>(store.subList(from, size));
  }
}

7.2 Entry

Entry modelliert einen einzelnen Datensatz als Record mit den Feldern seq (Sequenznummer), ts (Zeitstempel) und text (Nutzlast aus der CLI). Der Record ist unveränderlich und damit gut für nebenläufige Lesezugriffe geeignet. Die toString()‑Darstellung ist bewusst schlicht gehalten und gibt die drei Felder in einer Zeile aus; sie dient der textbasierten Übertragung über den /data‑Endpunkt und unterstützt das einfache Parsen auf der Client‑Seite.

package com.svenruppert.rest;
// --- Datenmodell ---
public record Entry(long seq, Instant ts, String text) {
  @NotNull
  @Override
  public String toString() {
    return seq + "|" + DateTimeFormatter.ISO_INSTANT.format(ts) + "|" + text;
  }
}

7.3 SseHandler

Der SseHandler implementiert den SSE‑Endpunkt /sse. Beim Aufruf richtet er die HTTP‑Antwort für einen text/event-stream ein, fügt den Ausgabestrom des Clients in die verwaltete Menge der SSE‑Verbindungen ein und sendet ein initiales Ereignis zur Bestätigung des erfolgreichen Verbindungsaufbaus. Ein periodischer Keep‑Alive‑Mechanismus (Ping‑Kommentare) hält die Verbindung offen und unterstützt die Erkennung abgebrochener Clients. Der Handler arbeitet eng mit den vom RestServer bereitgestellten Hilfsfunktionen zusammen (Schreiben einzelner Events bzw. Kommentare, Zugriff auf Scheduler und Verbindungsverwaltung).

Kommt es zu einem Schreibfehler oder zum Schließen der Gegenstelle, entfernt der Handler die Verbindung aus der Menge der aktiven Streams und räumt zugehörige Ressourcen auf. In Zusammenspiel mit dem Broadcasting des Servers entsteht so ein einfaches, aber belastbares Push‑Modell für Signale über neu verfügbare Daten.

package com.svenruppert.rest.handler;

// --- SSE ---
public record SseHandler(RestServer restServer)
    implements HttpHandler {
  @Override
  public void handle(HttpExchange ex)
      throws IOException {
    if (!"GET".equals(ex.getRequestMethod())) {
      restServer.respond(ex, 405, "Method Not Allowed");
      return;
    }

    restServer.addCors(ex);
    Headers h = ex.getResponseHeaders();
    h.add("Content-Type", "text/event-stream; charset=utf-8");
    h.add("Cache-Control", "no-cache");
    h.add("Connection", "keep-alive");
    ex.sendResponseHeaders(200, 0);
    final OutputStream os = ex.getResponseBody();
    restServer.getSseClients().add(os);
    // Initiales Event
    restServer.writeEvent(os, "init", "ready");
    // Keep-Alive Pings
    ScheduledFuture<?> pinger = restServer.getScheduler().scheduleAtFixedRate(() -> {
      try {
        restServer.writeComment(os, "ping");
      } catch (IOException e) { /* will be closed below */ }
    }, RestServer.PING_INTERVAL_MILLIS, RestServer.PING_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
    // Blockierend offen halten, bis Client/OS schließt
    restServer.getConnectionExecutor().execute(() -> {
      try (os) {
        // NOP: wir schreiben nur bei Events, sonst hält Ping die Leitung offen
        // Warten bis eine Ausnahme auftritt / OS geschlossen wird
        // (ein explizites Read/Write-Loop ist hier nicht nötig)
        Thread.currentThread().join();
      } catch (InterruptedException ignored) {
      } catch (Exception ignored) {
      } finally {
        pinger.cancel(true);
        restServer.getSseClients().remove(os);
        try {
          os.close();
        } catch (IOException ignored) {
        }
      }
    });
  }
}

7.4 DataHandler

Der DataHandler stellt den REST‑Endpunkt /data bereit und realisiert die zwei vorgesehenen Lesevarianten. Je nach Query‑Parametern liefert er entweder alle Einträge mit einer Sequenznummer größer als ein gegebener since‑Wert oder – falls keine Sequenznummer bekannt ist – standardmäßig die letzten n Einträge (konfigurierbar über DEFAULT_LAST_N bzw. den Parameter lastN). Die Antwort wird als text/plain erzeugt, wobei jeder Datensatz in einer eigenen Zeile ausgegeben wird.

Der Handler übernimmt zudem die Validierung der Query‑Parameter und erzeugt konsistente Fehlermeldungen bei ungültigen Eingaben. Gemeinsam mit dem SseHandler bildet er damit die Trennung von Signalisierung (SSE) und Nutzdatenabruf (REST) ab und erlaubt einen deterministischen, idempotenten Nachladeprozess auf Client‑Seite.

package com.svenruppert.rest.handler;

// --- DATA ---
public record DataHandler(RestServer restServer)
    implements HttpHandler {
  @Override
  public void handle(HttpExchange ex)
      throws IOException {
    if (!"GET".equals(ex.getRequestMethod())) {
      restServer.respond(ex, 405, "Method Not Allowed");
      return;
    }
    restServer.addCors(ex);
    Map<String, List<String>> q = restServer.parseQuery(ex.getRequestURI());
    String sinceStr = restServer.first(q.get("since"));
    String lastNStr = restServer.first(q.get("lastN"));
    List<Entry> result;
    if (sinceStr != null && !sinceStr.isBlank()) {
      long s;
      try {
        s = Long.parseLong(sinceStr);
      } catch (NumberFormatException nfe) {
        restServer.respond(ex, 400, "since must be a number");
        return;
      }
      result = restServer.since(s);
    } else {
      int n = RestServer.DEFAULT_LAST_N;
      if (lastNStr != null && !lastNStr.isBlank()) {
        try {
          n = Integer.parseInt(lastNStr);
        } catch (NumberFormatException nfe) {
          restServer.respond(ex, 400, "lastN must be a number");
          return;
        }
      }
      result = restServer.lastN(n);
    }
    // Ausgabe als text/plain, eine Zeile pro Eintrag: seq|ISO-TS|text
    StringBuilder sb = new StringBuilder();
    for (Entry e : result) {
      sb.append(e.toString()).append('\n');
    }
    restServer.respond(ex, 200, sb.toString(), RestServer.CONTENT_TYPE);
  }
}

8. Implementierung – Vaadin Flow UI

8.1 DashboardView

Die DashboardView ist die zentrale View der Demonstration und wird über die Route dashboard eingebunden. Sie präsentiert den aktuellen Datenbestand in einem Grid und stellt mit einem Button das bewusste Nachladen bereit. Beim Attach registriert sich die View beim UiBroadcaster und bindet einen Listener am SseClientService. Trifft ein „update“-Signal ein, informiert die View die Nutzenden (Statuszeile, Notification) und aktiviert den Nachlade‑Button. Der eigentliche Abruf erfolgt erst auf Nutzeraktion: Entweder werden neue Einträge seit der letzten bekannten Sequenz (lastSeq) geladen oder – wenn noch keine Sequenz vorliegt – die letzten n Nachrichten. Nach erfolgreichem Abruf wird lastSeq aktualisiert und das Grid konsistent fortgeschrieben. Beim Detach entfernt sich die View aus dem Broadcaster und deregistriert den Listener.

package com.svenruppert.flow.views.dashboard;

@Route(value = DashboardView.ROUTE, layout = MainLayout.class)
public class DashboardView
    extends Composite<VerticalLayout>
    implements HasLogger {
  public static final String ROUTE = "dashboard";

  // --- Konfiguration ---
  private static final String BASE = "http://localhost:8090"; // Server-Basis
  private static final String SSE_URL = BASE + "/sse";
  private static final String DATA_URL = BASE + "/data";
  private static final int DEFAULT_LAST_N = 20;

  // --- UI ---
  private final Grid<Entry> grid = new Grid<>(Entry.class, false);
  private final Button fetchBtn = new Button("Daten holen");
  private final Span status = new Span("Wartend auf Ereignisse …");

  // --- Client-Services ---
  private final DataClient dataClient = new DataClient(DATA_URL);
  private final SseClientService sseClient = new SseClientService(SSE_URL);
  private final AtomicBoolean hasNew = new AtomicBoolean(false);

  // --- Zustand ---
  private volatile Long lastSeq = null; // letzte bestätigte Sequenz
  public DashboardView() {
    getContent().setSizeFull();  grid.addColumn(Entry::seq).setHeader("Seq").setAutoWidth(true).setFlexGrow(0);
    grid.addColumn(e -> e.ts().toString()).setHeader("Zeitstempel").setAutoWidth(true).setFlexGrow(0);
    grid.addColumn(Entry::text).setHeader("Text").setFlexGrow(1);
    fetchBtn.setEnabled(false);
    fetchBtn.addClickListener(e -> fetch());
    getContent().add(status, fetchBtn, grid);
    addAttachListener(ev -> {
      UI ui = ev.getUI();
      UiBroadcaster.register(ui);
      SseClientService.Listener l = (type, data) -> {
        if ("update".equals(type)) {
          try {
            long seq = Long.parseLong(data.trim());
            // nur markieren; Fetch entscheidet über Delta/lastN
            hasNew.set(true);
            UiBroadcaster.broadcast(() -> {
              fetchBtn.setEnabled(true);
              status.setText("Neue Daten verfügbar (seq=" + seq + ") – bitte abrufen.");
              Notification.show("Neue Daten verfügbar", 1200, Notification.Position.TOP_CENTER);
            });
          } catch (NumberFormatException ignore) {
            // Fallback: Hinweis ohne seq
            hasNew.set(true);
            UiBroadcaster.broadcast(() -> {
              fetchBtn.setEnabled(true);
              status.setText("Neue Daten verfügbar – bitte abrufen.");
            });
          }
        }
      };

      sseClient.addListener(l);
      addDetachListener(ev2 -> {
        sseClient.removeListener(l);
        UiBroadcaster.unregister(ui);
      });
    });
  }

  private void fetch() {
    fetchBtn.setEnabled(false);
    List<Entry> toShow;
    if (lastSeq != null) {
      toShow = dataClient.fetchSince(lastSeq);
    } else {
      toShow = dataClient.fetchLastN(DEFAULT_LAST_N);
    }
    if (!toShow.isEmpty()) {
      lastSeq = toShow.getLast().seq(); // neueste merken
    }
    UI ui = UI.getCurrent();
    if (ui != null) {
      // effectively final
      ui.access(() -> {
        if (!toShow.isEmpty()) {
          List<Entry> current = new ArrayList<>(grid.getListDataView().getItems().toList());
          current.addAll(toShow);
          grid.setItems(current);
          status.setText("Daten geladen (" + current.size() + " Einträge).");
        } else {
          status.setText("Keine neuen Einträge.");
        }
        hasNew.set(false);
      });
    }
  }
}

8.2 DataClient

Der DataClient kapselt die REST‑Kommunikation mit dem Endpunkt /data. Er bietet zwei Lesewege: fetchSince(long since) für Delta‑Abrufe und fetchLastN(int n) für die initiale Synchronisierung. Die Antworten werden als text/plain erwartet und in eine Liste von Entry konvertiert (Format: seq|ISO‑TS|text, eine Zeile pro Datensatz). Durch die Kapselung der HTTP‑Details bleibt die View schlank; Änderungen am Transportformat oder an Timeouts lassen sich zentral anpassen, ohne UI‑Code zu berühren.

package com.svenruppert.flow.views.dashboard;

// --- REST-Client für /data ---
public final class DataClient {
  private final HttpClient http = HttpClient.newBuilder()
      .connectTimeout(Duration.ofSeconds(5)).build();
  private final String baseUrl;

  public DataClient(String baseUrl) {
    this.baseUrl = baseUrl;
  }

  // Serverformat: seq|ISO-TS|text pro Zeile
  private static List<Entry> parse(String body)
      throws IOException {
    List<Entry> out = new ArrayList<>();
    if (body == null || body.isBlank()) return out;
    String[] lines = body.split("\\R");
    for (String line : lines) {
      if (line.isBlank()) continue;
      String[] parts = line.split("\\|", 3);
      if (parts.length < 3) continue;
      long seq = Long.parseLong(parts[0]);
      Instant ts = Instant.parse(parts[1]);
      String text = parts[2];
      out.add(new Entry(seq, ts, text));
    }
    return out;
  }

  public List<Entry> fetchSince(long since) {
    String url = baseUrl + "?since=" + since;
    return fetch(url);
  }

  public List<Entry> fetchLastN(int n) {
    String url = baseUrl + "?lastN=" + n;
    return fetch(url);
  }

  private List<Entry> fetch(String url) {
    try {
      HttpRequest req = HttpRequest.newBuilder(URI.create(url))
          .timeout(Duration.ofSeconds(5)).GET().build();
      HttpResponse<String> resp = http.send(req, HttpResponse.BodyHandlers.ofString());
      if (resp.statusCode() != 200) return List.of();
      return parse(resp.body());
    } catch (Exception e) {
      return List.of();
    }
  }
}

8.3 Entry

Entry repräsentiert einen einzelnen Datensatz mit monotoner Sequenznummer, Zeitstempel und Nachrichteninhalt. Der unveränderliche Record eignet sich gut für die Anzeige im Grid und für die Nebenläufigkeit in der View. In der Vaadin‑Demonstration dient Entry als leichtgewichtiges Transport‑ und Anzeigeobjekt, das direkt aus der REST‑Antwort erzeugt und unverändert weitergereicht wird.

package com.svenruppert.flow.views.dashboard;

// --- Datenmodell auf Client-Seite ---
public record Entry(long seq, Instant ts, String text) { }

8.4 SseClientService

Der SseClientService stellt den serverseitigen SSE‑Client der Vaadin‑Anwendung dar. Er hält eine langlebige HTTP‑Verbindung zu /sse, parst eingehende Ereignisse im Format text/event-stream und verteilt sie an registrierte Listener in der Anwendung. Nach Verbindungsabbrüchen wird automatisch ein Reconnect versucht. Die Service‑Grenze ist bewusst simpel: Ereignisse werden als type/data weitergereicht (insbesondere event: update, data: <seq>). Die View entscheidet anschließend, ob und wie nachgeladen wird. Damit bleibt die Trennung zwischen Signalisierung (SSE) und Datenübertragung (REST) gewahrt.

package com.svenruppert.flow.views.dashboard;

// --- SSE-Client (serverseitig) ---
public final class SseClientService {
  private final HttpClient http = HttpClient.newBuilder()
      .connectTimeout(Duration.ofSeconds(5)).build();

  private final String url;
  private final List<Listener> listeners = new CopyOnWriteArrayList<>();
  private volatile boolean running = false;

  public SseClientService(String url) {
    this.url = Objects.requireNonNull(url);
  }

  private static void sleep(long ms) {
    try {
      Thread.sleep(ms);
    } catch (InterruptedException ignored) {
    }
  }

  public void addListener(Listener l) {
    listeners.add(l);
    ensureLoop();
  }

  public void removeListener(Listener l) {
    listeners.remove(l);
  }

  private synchronized void ensureLoop() {
    if (running) return;
    running = true;
    Thread t = new Thread(this::loop, "sse-client-loop");
    t.setDaemon(true);
    t.start();
  }

  private void loop() {
    while (running) {
      try {
        HttpRequest req = HttpRequest.newBuilder(URI.create(url))
            .header("Accept", "text/event-stream")
            .timeout(Duration.ofSeconds(30)).GET().build();
        HttpResponse<InputStream> resp = http.send(req, HttpResponse.BodyHandlers.ofInputStream());
        if (resp.statusCode() != 200) {
          sleep(1500);
          continue;
        }
        try (var is = resp.body();
             var br = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) {
          String line;
          String event = "message";
          StringBuilder data = new StringBuilder();
          while ((line = br.readLine()) != null) {
            if (line.isEmpty()) { // Event abschließen
              if (!data.isEmpty()) {
                fire(event, data.toString());
                data.setLength(0);
                event = "message";
              }
              continue;
            }
            if (line.startsWith(":")) continue; // Kommentar
            if (line.startsWith("event:")) {
              event = line.substring("event:".length()).trim();
            } else if (line.startsWith("data:")) {
              if (!data.isEmpty()) data.append('\n');
              data.append(line.substring("data:".length()).trim());
            }
          }
        }
      } catch (Exception e) {
        sleep(1500);
      }
    }
  }

  private void fire(String type, String data) {
    for (Listener l : listeners) {
      try {
        l.onEvent(type, data);
      } catch (Exception ignored) {
      }
    }
  }

  public interface Listener {
    void onEvent(String type, String data);
  }
}

8.5 UiBroadcaster

Der UiBroadcaster ist eine kleine Hilfsklasse, um UI‑Updates threadsicher und an mehrere aktive UIs zu verteilen. Er verwaltet eine Liste aktuell verbundener UI‑Instanzen und führt bereitgestellte Commands innerhalb des jeweiligen UI‑Kontextes aus (ui.access(…)). So lassen sich Reaktionen auf externe Signale (SSE) gefahrlos in die Vaadin‑UI integrieren, ohne UI‑Thread‑Verstöße zu riskieren. Die View registriert sich beim Attach und deregistriert sich beim Detach, wodurch Zombie‑Referenzen vermieden werden.

package com.svenruppert.flow.views.dashboard;

// --- Einfacher Broadcaster, um UI-Updates threadsicher zu verteilen ---
public final class UiBroadcaster {

  private UiBroadcaster() {
  }

  private static final List<UI> UI_LIST = new CopyOnWriteArrayList<>();

  public static void register(UI ui) {
    UI_LIST.add(ui);
  }

  public static void unregister(UI ui) {
    UI_LIST.remove(ui);
  }

  public static void broadcast(Command task) {
    for (UI ui : List.copyOf(UI_LIST)) {
      try {
        ui.access(task);
      } catch (Exception ignored) {
      }
    }
  }
}

9. Zusammenfassung

9.1 Bewertung des „Signal-per-SSE, Daten-per-REST“-Musters

Die Demonstration hat gezeigt, dass die Trennung von Signalisierung und Datenübertragung eine klare und nachvollziehbare Architektur ermöglicht. SSE eignet sich hervorragend dazu, Clients in Echtzeit über Veränderungen zu informieren, während REST die zuverlässige und flexible Bereitstellung der eigentlichen Daten übernimmt. Durch diese Arbeitsteilung entstehen geringe Kommunikationskosten bei gleichzeitig hoher Transparenz für die Nutzerinnen und Nutzer.

9.2 Didaktischer Nutzen und Wiederverwendbarkeit

Das vorgestellte Szenario ist bewusst einfach gehalten, um die Funktionsweise von SSE im Zusammenspiel mit einer Vaadin-Flow-Anwendung verständlich zu machen. Die Integration einer CLI zur Dateneingabe erleichtert es, die Kette von Ereignis, Signal und Abruf anschaulich nachzuvollziehen. Damit eignet sich das Beispiel nicht nur für technische Experimente, sondern auch für Schulungen, Workshops und Lehrmaterialien. Aufgrund seiner modularen Struktur kann es leicht erweitert oder in komplexere Projekte integriert werden, etwa als Grundlage für weiterführende Experimente im Umfeld des URL-Shortener-Open-Source-Projekts.

Total
0
Shares
Previous Post

Wie und warum man das klassische Observer-Pattern in Vaadin verwenden soll

Next Post

Testen mit Java – die vielfältige Welt der Test-Frameworks

Related Posts