A Tale of Two Runtimes: Setting Up Your Local Java Development with Flink

Avatar

It was the best of builds, it was the worst of builds. One ran effortlessly in the IDE, the other stubbornly broke at runtime. Welcome to the tale of two runtimes: your local development setup and the Flink cluster where your job is ultimately meant to live.

In this post, we’ll walk through how to set up your Flink Java project so you can develop, test, and run it locally with ease. Just like Charles Dickens’s works, if you’re just getting started with Apache Flink, the setup can feel a bit intimidating…

Flink is a powerful framework for building scalable, fault-tolerant, and real-time stream processing applications. We will go through together how to run and test your jobs locally. Whether you’re debugging from your IDE, spinning up a local cluster, or writing your first test cases, this post walks through the nuts and bolts of setting up a smooth development workflow with Flink.

Our use case

We’ll be working with a rather simple use case in our example, just for the sake of having something to work with. Let’s imagine that we’re consuming a stream of quotes by various authors, and we wish to keep track of each author’s quotes during the past day. We want to have this count updated every minute and, after each day, to have it reset again (for whatever reason that might be).

The quotes are served from a Kafka cluster, and we have to send the updates to another one.

This is a typical Flink use case, so we have decided to implement it with Flink. So let’s dive right in!

All code shown can be found in this repository: https://github.com/alex-charos/quotes-job

Setup

Let’s take a quick look at how you would go about setting up your local project.

Maven archetype

There’s a handy archetype that Maven provides to get you started, so we’ll use this one:

mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.20.1 \
-DgroupId=gr.charos.literature \
-DartifactId=quotesjob \
-Dversion=0.1 \
-Dpackage=gr.charos.literature \
-DinteractiveMode=false

One of the main advantages of using the Maven archetype is that it comes pre-configured with the shade goal in its packaging phase, which builds a Flink job artifact file (JAR) that can be deployed in a Flink cluster.

The archetype is also kind enough to create a main class that acts as the entry point for our job.

package gr.charos.literature;
//... Omittting imports

public class DataStreamJob {

  public static void main(String[] args) throws Exception {
    // Sets up the execution environment, which is the main entry point
    // to building Flink applications.
    final StreamExecutionEnvironment env =
      StreamExecutionEnvironment.getExecutionEnvironment();

    /*
     * Here, you can start creating your execution plan for Flink.
     *
     * Start with getting some data from the environment, like
     * 	env.fromSequence(1, 10);
     *
     * then, transform the resulting DataStream<Long> using operations
     * like
     * 	.filter()
     * 	.flatMap()
     * 	.window()
     * 	.process()
     *
     * and many more.
     * Have a look at the programming guide:
     *
     * https://nightlies.apache.org/flink/flink-docs-stable/
     *
     */

    // Execute program, beginning computation.
    env.execute("Flink Java API Skeleton");
  }
}

You might be tempted to rename it to something of your choice. Just make sure that you also do so in the pom.xml file!

<execution>
  <phase>package</phase>
  <goals>
    <goal>shade</goal>
  </goals>
  <configuration>
    <createDependencyReducedPom>false</createDependencyReducedPom>
    <artifactSet>
      <excludes>
        <exclude>org.apache.flink:flink-shaded-force-shading</exclude>
        <exclude>com.google.code.findbugs:jsr305</exclude>
        <exclude>org.slf4j:*</exclude>
        <exclude>org.apache.logging.log4j:*</exclude>
      </excludes>
    </artifactSet>
    <filters>
      <filter>
      <!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
  
        <artifact>*:*</artifact>
        <excludes>
	  <exclude>META-INF/*.SF</exclude>
	  <exclude>META-INF/*.DSA</exclude>
	  <exclude>META-INF/*.RSA</exclude>
        </excludes>
      </filter>
    </filters>
    <transformers>
      <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
      <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
        <mainClass>gr.charos.literature.DataStreamJob</mainClass>
      </transformer>
    </transformers>
  </configuration>
</execution>

Flink & Java Versions

A thing we need to keep in mind is Flink’s support for Java versions. It is documented here.

Kafka Cluster(s)

It’s always useful to set up a local Kafka broker so as to be able to play with your implementation, along with some sort of UI where you may view the broker’s state and perhaps manipulate it. The docker-compose file below works just fine for that purpose.

Heads up, this docker-compose.yml file has an image (init-kafka) whose purpose is to simply create the topics which we’ll be using.

version: "2.2"
name: quotes-cluster
services:
  quotes_broker:
    image: apache/kafka:latest
    hostname: quotes_broker
    container_name: quotes_broker
    expose:
      - '9092'
      - '9093'
    ports:
      - '9092:9092'
      - '9093:9093'
    networks:
      - quotes-cluster-network
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://127.0.0.1:9093,PLAINTEXT_HOST://host.docker.internal:9092'
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@quotes_broker:29093'
      KAFKA_LISTENERS: 'CONTROLLER://:29093,PLAINTEXT_HOST://:9092,PLAINTEXT://:9093'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      CLUSTER_ID: 'test-cluster-id'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
    healthcheck:
      test: [ "CMD-SHELL", "kafka-topics.sh --bootstrap-server kafka:9092 --list" ]
      interval: 5s
      timeout: 10s
      retries: 5

  quote-broker-ui:
    image: tchiotludo/akhq
    container_name: quote-broker-ui
    ports:
      - "8082:8080"  # AKHQ Web UI
    depends_on:
      - quotes_broker
    networks:
      - quotes-cluster-network
    environment:
      AKHQ_CONFIGURATION: |
        akhq:
          connections:
            source-cluster:
              properties:
                bootstrap.servers: "quotes_broker:9092"
        

  init-kafka:
    image: confluentinc/cp-kafka:6.1.1
    depends_on:
      - quotes_broker
    networks:
      - quotes-cluster-network
    entrypoint: [ '/bin/sh', '-c' ]
    command: |
      "
      # blocks until kafka is reachable
      kafka-topics --bootstrap-server quotes_broker:9092 --list
      echo -e 'Creating kafka topics'
      kafka-topics --bootstrap-server quotes_broker:9092 --create --if-not-exists --topic authored-quotes --replication-factor 1 --partitions 1
      kafka-topics --bootstrap-server quotes_broker:9092 --create --if-not-exists --topic authored-quote-counts --replication-factor 1 --partitions 1

      echo -e 'Successfully created the following topics:'
      kafka-topics --bootstrap-server quotes_broker:9092 --list
      "

networks:
  quotes-cluster-network:
    driver: bridge

Our implementation

Pretty much outside the scope of this article, I’ll just broadly describe the implementation here so as to have a general idea of how we implemented this solution. As mentioned earlier, this is a very simple Flink use case.

Source Stream

Our source is a Kafka cluster where messages come serialized as JSON in the format below:

{
  "author":"Charles Dickens", 
  "quote":"A day wasted on others is not wasted on one's self."
}

We’ll be serializing this in a “Quote” record and using the JsonDeserializationSchema class for deserializing from the source topic.

Properties kafkaProps = new Properties();
		
Config config = getConfig(args);

kafkaProps
  .setProperty("bootstrap.servers",config.sourceBootstrapServers());

kafkaProps
  .setProperty("group.id", config.sourceGroupId());

JsonDeserializationSchema<Quote> jsonFormat = 
  new JsonDeserializationSchema<>(Quote.class);

FlinkKafkaConsumer<Quote> kafkaConsumer = 
  new FlinkKafkaConsumer<>(config.sourceTopic(),jsonFormat, kafkaProps);
		
DataStream<Quote> textStream = env.addSource(kafkaConsumer);

In order to do this, we need to pull in a couple of dependencies (one of the Kafka connector, and one for Flink’s JSON support):

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-json</artifactId>
  <version>${flink.version}</version>
  <scope>provided</scope>
</dependency>

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka</artifactId>
  <version>${flink.connector.kafka.version}</version>
</dependency>

The Process Function

We wish to keep track of each author’s quote count of the past day, so we’ll opt for a simple implementation where we’ll key our source by the author’s name and then keep their quotes in our state.

We’ll be windowing by processing time, with 1-minute windows since this is how often we wish to update our counts.

DataStream<AuthorQuotesCount> authorQuotes = 
    textStream
      .keyBy(Quote::author)
      .window(
        TumblingProcessingTimeWindows.of(Duration.ofSeconds(5)))
      .process(new QuoteCountFunction());

Our QuoteCountFunction implementation class will update the state with the observed quotes within that window and push the latest state for the author in question.

public class QuoteCountFunction 
  extends ProcessWindowFunction<Quote, AuthorQuotesCount, String, TimeWindow> {
  private transient ValueState<AuthorQuotes> currentState;

  private final StateTtlConfig ttlConfig = 
    StateTtlConfig
      .newBuilder(Duration.ofDays(1)) // Keep last day
      .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
      .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
      .cleanupInRocksdbCompactFilter(1000)
    .build();

  @Override
  public void open(OpenContext openContext) {
    ValueStateDescriptor<AuthorQuotes> mState =
      new ValueStateDescriptor<>("state", AuthorQuotes.class);
    
    mState.enableTimeToLive(ttlConfig);
    
    currentState = getRuntimeContext().getState(mState);

  }
    
@Override
public void process(String key,
                        Context context,
                        Iterable<Quote> elements,
                        Collector<AuthorQuotesCount> out) throws Exception {

  AuthorQuotes  current = currentState.value();
  
  if (current == null) {
    current = new AuthorQuotes(key);
  }

  for (Quote element : elements) {
    current.getQuotes().add(element.quote());
  }

  out.collect(new AuthorQuotesCount(key,current.getQuotes().size()));
  currentState.update(current);


  }
}

The state we store is a simple structure (which might as well be a map!)

public class AuthorQuotes {
    private final String author;
    private List<String> quotes;
    public AuthorQuotes(String author) {
        this.author = author;
    }

    public String getAuthor() {
        return author;
    }


    public List<String> getQuotes() {
        if (quotes == null) {
            quotes = new ArrayList<>();
        }
        return quotes;
    }
}

Sink

Eventually, we’ll publish to our sink something along the lines of the following record.

public record AuthorQuotesCount(String author, Integer quotesCount) {}

…and we’ll serialize with JSON!


		
Properties destKafkaProps = new Properties();
		
destKafkaProps
  .setProperty("bootstrap.servers",config.destinationBootstrapServers());

FlinkKafkaProducer<AuthorQuotesCount> kafkaProducer = 
  new FlinkKafkaProducer<>(
    config.destinationTopic(), jsonSerialization, destKafkaProps);

authorQuotes.addSink(kafkaProducer);

Running on our IDE

One of the best things about working with Flink is how easy it is to start it up directly from your IDE and debug any initial teething issues that are bound to come up, especially if we’re only getting started with it.

The first time I tried to run my main method from my IDE. I got the following error.

Error: Unable to initialize main class gr.charos.literature.DataStreamJob
Caused by: java.lang.NoClassDefFoundError: org/apache/flink/api/common/serialization/DeserializationSchema

This happened because our project declares the Flink libraries with the provided scope. Of course, this is fair play since our job is supposed to run on a flink cluster where exactly those libraries will be on the runtime. To run and debug our job from the IDE, we’ll need to include the provided-scoped libraries in the classpath.

This makes writing our first iteration amazingly simple and allows us to start iterating fast on our job!

Running outside our IDE helps (to a point) reduce the “works on my PC” cases and also helps when someone simply wants to spin up our job but does not want to worry with the actual codebase. There are also some subtle differences in terms of the execution environment that Flink launches when it runs locally.

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

When running locally, you will be provided a LocalStreamEnvironment here, as per Flink documentation:

The LocalEnvironment is a handle to local execution for Flink programs. Use it to run a program within a local JVM – standalone or embedded in other programs. The local environment is instantiated via the method ExecutionEnvironment.createLocalEnvironment(). By default, it will use as many local threads for execution as your machine has CPU cores (hardware contexts). You can alternatively specify the desired parallelism. The local environment can be configured to log to the console using enableLogging()/disableLogging().

There are two ways to set up your local Flink cluster. Depending on various factors, you may choose to use one or the other. I’m going to leave both approaches here and pick and choose those that best suit your setup.

In my opinion, the Flink binary will be helpful whichever way you decide, so it makes sense to have the binary also set up your cluster.

You can download Flink from the official website (https://flink.apache.org/downloads/) or install it using SDKMAN. Please note that SDKman may be slightly behind the latest released version.

The binary download gives you a nitty little script that allows you to spin up a cluster quickly:

# In case of SDKMAN: ~/.sdkman/candidates/flink/current/bin/start-cluster.sh
${path-to-flink}/bin/start-cluster.sh

Option 2: docker-compose

There’s a thorough guide on setting up in the official Flink documentation (https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/docker/#flink-with-docker-compose). Going with session mode, where you can submit (via the web ui or the CLI) jobs, seems like the way to go

version: "2.2"
services:
  jobmanager:
    image: flink:latest
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager        

  taskmanager:
    image: flink:latest
    depends_on:
      - jobmanager
    command: taskmanager
    scale: 1
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 2        

Deploying your job

You can then run your job once you build it:

$ mvn clean package

$ flink run target/quotesjob-0.1.jar

Navigating to http://localhost:8081 will give you an overview of your cluster and the jobs running.

Unit (…and Integration, I suppose…) Testing

Ahhh.. Without a doubt, my favourite part!

Cue here the standard lecture that we are obliged to give (or receive) in terms of what a Unit actually is and what the extent of a unit test ought to be before it actually becomes an integration test…

We should definitely keep our business code as isolated as possible and test those closures irrespective of Flink. However, when we want to test functions which use internal state and timers, ie, interact more with Flink’s runtime, things can get a bit tricky. For such cases, Flink provides us with test harnesses.

I think Flink’s training repository is helpful in any case. Still, it’s worth reviewing the exercises and tests, if only to see the smoothness with which they are implemented and gain some inspiration for setting up your testing codebase.

Harness

Let’s say we’ve written a ProcessWindowFunction that keeps state or works with timers. How do we test it without spinning up an entire Flink job? That’s where Flink’s test harnesses come in. You can read more about them here:

We’ll first be adding our required dependencies in our pom.xml file:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-test-utils</artifactId>
  <version>${flink.version}</version>
  <scope>test</scope>
</dependency>

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-test-utils-junit</artifactId>
  <version>${flink.version}</version>
  <scope>test</scope>
</dependency>

There is a huge variety of examples for harness tests in Flink’s GitHub

Harness Setup

I think the biggest pain in using harness testing is setting the harness up. There is some plumbing required to get it going, but once you do, it is really powerful and allows you to simulate various conditions (processing time, for example) and allows for thorough testing.

It is a KeyedInputStream we’ll be testing, with one input, so we’ll be using the respective class that Harness provides us.

private KeyedOneInputStreamOperatorTestHarness<String, Quote, AuthorQuotesCount> testHarness;

This harness will be instantiated with a window operator and keying information (key extractor and key type info.

Setting up the WindowOperator instance is what troubled me most when attempting to use the harnesses for the first time, mainly due to the many arguments that we need to pass to it…!

TL;DR: What the WindowOperator needs

  • Type of window (e.g., TumblingEventTimeWindows.of(...))
  • Key selector function
  • Key serializer
  • State descriptor
  • The wrapped ProcessWindowFunction (via InternalIterableProcessWindowFunction)
  • Trigger (e.g., ProcessingTimeTrigger.create())
  • Lateness and allowed lateness settings.
ListStateDescriptor<Quote> stateDesc =
  new ListStateDescriptor<>(
    "window-contents",
    STRING_INT_TUPLE.createSerializer(
      new ExecutionConfig()));

WindowOperator <String,  // Key Type 
                Quote,   // IN type
                Iterable<Quote>, //IN type of Iterables that will be passed 
                AuthorQuotesCount, // OUT type
                TimeWindow // Type of window
                > windowOperator =
  new WindowOperator<>(
    TumblingEventTimeWindows.of(Duration.ofMillis(100)), 
    new TimeWindow.Serializer(), // Time window serializer
    Quote::author, // Our Key function 
    BasicTypeInfo.STRING_TYPE_INFO.createSerializer(
      new ExecutionConfig()), // Key serializer (String in our case)
    stateDesc, // See above
    new InternalIterableProcessWindowFunction<>(
      new QuoteCountFunction()), // wrapping our process function 
    ProcessingTimeTrigger.create(), // Processing time trigger
    0, //    Will not work 
    null //  with lateness today..!
);

We can now start writing unit tests to ensure our function works as expected.

@Test
public void testProcessCount() throws Exception {
  // manipulate processing time
  testHarness.setProcessingTime(0);

  // push elements and their timestamp
  testHarness.processElement(
    new StreamRecord<>(
      new Quote("Orwell","Freedom is the right to tell people what they do not want to hear."),
      10));
  testHarness.processElement(
    new StreamRecord<>(
      new Quote("Huxley","After silence, that which comes nearest to expressing the inexpressible is music."),
      20));
  testHarness.processElement(
    new StreamRecord<>(
      new Quote("Orwell","Happiness can exist only in acceptance."),
      50));

  testHarness.processElement(
    new StreamRecord<>(
      new Quote("Dickens","There are dark shadows on the earth, but its lights are stronger in the contrast."),
      100));
  testHarness.processElement(
    new StreamRecord<>(
      new Quote("Steinbeck","Power does not corrupt. Fear corrupts... perhaps the fear of a loss of power."),
      100));

  // first window comple, start of second window.
  testHarness.setProcessingTime(100);

  assertEquals(2, testHarness.getRecordOutput().size());

  long orwellRecordsCount = 
    testHarness.getRecordOutput().stream()
      .filter(p->p.getValue().author().equals("Orwell")).count();
        
  long huxleyRecordsCount = 
    testHarness.getRecordOutput().stream()
      .filter(p->p.getValue().author().equals("Huxley")).count();
        
  long dickensRecordsCount = 
    testHarness.getRecordOutput().stream()
      .filter(p->p.getValue().author().equals("Dickens")).count();
        
  long steinbeckRecordsCount = 
    testHarness.getRecordOutput().stream()
      .filter(p->p.getValue().author().equals("Steinbeck")).count();

  assertEquals(1, orwellRecordsCount);
  assertEquals(1, huxleyRecordsCount);
  assertEquals(0, dickensRecordsCount);
  assertEquals(0, steinbeckRecordsCount);


  int orwellQuotes = testHarness.getRecordOutput().stream()
    .filter(
      p->p.getValue().author().equals("Orwell"))
    .findFirst().get().getValue().quotesCount();
        

  int huxleyQuotes = testHarness.getRecordOutput().stream()
    .filter(
      p->p.getValue().author().equals("Huxley"))
    .findFirst().get().getValue().quotesCount();


  assertEquals(2, orwellQuotes);
  assertEquals(1, huxleyQuotes);

}

Another approach to Unit (or Integration test if we’re being pedantic) is to use Flink’s own MiniCluster. This essentially allows you to work directly with the API that you use when defining the job, making it much more straightforward to set up.

This approach is especially helpful if you use Event-Timed windows. Honestly, the official and other online documentation is relatively weak on how to set it up to work with processing time, which is where test harnesses really shine.

When to pick which approach in Unit Testing?

  • Use test harnesses for low-level control, especially with processing time or detailed operator testing.
  • Use MiniCluster for broader integration tests or event-time testing, where you’re testing more of the job graph.

Summing up

Illustration generated using AI (ChatGPT with image capabilities by OpenAI).

IDE setup? ✅

Local cluster? ✅

Docker Compose? ✅

Unit & Integration tests? ✅

We’re all set!

We’ve gone through the core steps of setting up a local development environment for Flink: running and debugging directly from the IDE, spinning up a local or Docker-based cluster, and writing both unit and integration tests using Flink’s test harnesses and MiniCluster.

This setup should give you a solid foundation for developing and testing your Flink jobs with confidence. There’s a lot more to explore from here, but you now have all the essentials to start experimenting, iterating, and building your first streaming applications.

Happy coding!

Active in the Java Community
Alexandros Charos is an active member of the Java community and regularly speaks at conferences, such as JCON and OpenBlend Slovenia. In this article, they share insights to support ongoing exchange and learning in the ecosystem.
JCON OpenBlend Slovenia is a community-driven conference by the Slovenian OpenBlend and SIOUG user groups in collaboration with JCON, uniting Java developers for exchange, networking, and practical learning.

Total
0
Shares
Previous Post

Kotlin Multiplatform’s Cross-Platform Brilliance at Norway’s Nearly 400-Year-Old National Postal Service

Next Post

Optimizing Java Performance: Caching with Core Java Instead of Complex Frameworks

Related Posts