HomeJavaHow to Build a Data Pipeline using Flink and Kafka

How to Build a Data Pipeline using Flink and Kafka

In this journal blog, we are going to have a look at how to build a data pipeline using Flink and Kafka. But before we start let’s first understand what exactly these two technologies are.

Apache Flink is a stream processing framework and distributed processing engine for stateful computations over unbounded and bounded data streams.

Apache Kafka is a distributed stream processing system with the following three key capabilities:

  • Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.
  • Store streams of records in a fault-tolerant durable way.
  • Process streams of records as they occur.

We will be assuming that you have a little knowledge about Kafka and the installation procedure. However, if you want help then please refer to the official guide to install and configure Apache Kafka. Once the installation process is done, now we can use the below mentioned commands to create a topics in Kafka.

Kafka OUTPUT topic “dj_out”

bin/kafka-topics.sh --create --zookeeper localhost:9092 --replication-factor 1 --partitions 1 --topic dj_out

Kafka INPUT topic “dj_in”

bin/kafka-topics.sh --create --zookeeper localhost:9092 --replication-factor 1 --partitions 1 --topic dj_in

For the sake of this blog, we’ll use default configuration and default ports for Apache Kafka.

Flink Usage

Apache Flink is a distributed system and requires compute resources in order to execute applications. Flink integrates with all common cluster resource managers such as Hadoop YARN, Apache Mesos, and Kubernetes but can also be setup to run as a stand-alone cluster. A few basic data sources and sinks are built into Flink and are always available. In Flink – there are various connectors available :

  • Apache Kafka (source/sink)
  • Apache Cassandra (sink)
  • Amazon Kinesis Streams (source/sink)
  • Elasticsearch (sink)
  • Hadoop FileSystem (sink)
  • RabbitMQ (source/sink)
  • Apache NiFi (source/sink)
  • Twitter Streaming API (source)

To add Flink to our project, the following Maven dependencies are needed to be included in the project:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>1.8.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
    <version>1.8.1</version>
</dependency>	
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.12</artifactId>
    <version>1.8.1</version>
</dependency>

Adding those dependencies will allow us to consume and produce to and from Kafka topics. You can find the current version of Flink on Maven Central.

Kafka String Consumer

To consume data from Kafka with Flink we need to provide a Kafka address and the topic name. To hold the offsets so that we won’t always read the data from the beginning we should also provide the group id.

Let’s create a static method that will make the creation of FlinkKafkaConsumer easier:

public static FlinkKafkaConsumer011<String> generateConsumerForTopic(
  String kafkaTopic, String kafkaAddress, String kafkaGroup ) {
  
    Properties properties = new Properties();
	properties.setProperty("bootstrap.servers", kafkaAddress);
	properties.setProperty("group.id",kafkaGroup);
    FlinkKafkaConsumer011<String> djConsumer = new FlinkKafkaConsumer011<>(
      kafkaTopic, new SimpleStringSchema(), properties);
 
    return djConsumer;
}

The above-specified method takes a kafkaTopic, kafkaAddress, and kafkaGroup and finally generates the FlinkKafkaConsumer that will consume data from the specified topic as a String since we have used SimpleStringSchema to decode data.

The number 011 in the name of class refers to the Kafka version.

Kafka String Producer

To produce data to Kafka, we need to provide Kafka address and kafka topic name where we want to write the data. Again, in a similar fashion we can create a static method that will help us to create producers for different topics:

public static FlinkKafkaProducer011<String> generateProducerForTopic(
  String kafkaTopic, String kafkaAddress){
 
    return new FlinkKafkaProducer011<>(kafkaAddress,
      kafkaTopic, new SimpleStringSchema());
}

The above-specified method takes only two parameters named kafkaTopic and kafkaAddress since there’s no need to provide group id when we are producing to Kafka topic.

String Stream Processing

Now that we have a fully functional consumer and producer methods ready, let’s try to process data from Kafka and then save the generated output result back to Kafka.

In this example, we’re going to convert each word in lowercase for each Kafka message entry and then write it back to Kafka.

For this purpose we need to create a custom MapFunction:

public class ConvertToLowerCase implements MapFunction<String, String> {
    @Override
    public String map(String s) {
        return s.toLowerCase();
    }
}

After creating the function, we can use it in stream processing:

public static void main( String[] args ) {
    String inTopic = "dj_in";
    String outTopic = "dj_out";
    String consumerGroup = "dj_group";
    String kafkaAddress = "localhost:9092";
    try {
    	StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
		FlinkKafkaConsumer011<String> flinkKafkaConsumer = generateConsumerForTopic(inTopic, kafkaAddress, consumerGroup);
		DataStream<String> stringInputStream = environment.addSource(flinkKafkaConsumer);
	 
		FlinkKafkaProducer011<String> flinkKafkaProducer = generateProducerForTopic(outTopic, kafkaAddress);
	 
		stringInputStream
	   		.map(new ConvertToLowerCase())
	   		.addSink(flinkKafkaProducer);

		environment.execute();
    } catch(Exception e) {
    	e.printStackTrace();
    }
}

The application main method will read data from the dj_in topic, perform operations on the stream and then save the results to the dj_out topic in Kafka.

RELATED ARTICLES

Most Popular