In this journal blog, we are going to have a look at how to build a data pipeline using Flink and Kafka. But before we start let’s first understand what exactly these two technologies are.
Apache Flink is a stream processing framework and distributed processing engine for stateful computations over unbounded and bounded data streams.
Apache Kafka is a distributed stream processing system with the following three key capabilities:
- Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.
- Store streams of records in a fault-tolerant durable way.
- Process streams of records as they occur.
We will be assuming that you have a little knowledge about Kafka and the installation procedure. However, if you want help then please refer to the official guide to install and configure Apache Kafka. Once the installation process is done, now we can use the below mentioned commands to create a topics in Kafka.
Kafka OUTPUT topic “dj_out”
bin/kafka-topics.sh --create --zookeeper localhost:9092 --replication-factor 1 --partitions 1 --topic dj_out
Kafka INPUT topic “dj_in”
bin/kafka-topics.sh --create --zookeeper localhost:9092 --replication-factor 1 --partitions 1 --topic dj_in
For the sake of this blog, we’ll use default configuration and default ports for Apache Kafka.
Flink Usage
Apache Flink is a distributed system and
- Apache Kafka (source/sink)
- Apache Cassandra (sink)
- Amazon Kinesis Streams (source/sink)
- Elasticsearch (sink)
- Hadoop FileSystem (sink)
- RabbitMQ (source/sink)
- Apache NiFi (source/sink)
- Twitter Streaming API (source)
To add Flink to our project, the following Maven dependencies are needed to be
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.12</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.8.1</version> </dependency>
Adding those dependencies will allow us to consume and produce to and from Kafka topics. You can find the current version of Flink on Maven Central.
Kafka String Consumer
To consume data from Kafka with Flink we need to provide a Kafka address and the topic name. To hold the offsets so that we won’t always read the data from the beginning we should also provide the group id.
Let’s create a static method that will make the creation of FlinkKafkaConsumer easier:
public static FlinkKafkaConsumer011<String> generateConsumerForTopic( String kafkaTopic, String kafkaAddress, String kafkaGroup ) { Properties properties = new Properties(); properties.setProperty("bootstrap.servers", kafkaAddress); properties.setProperty("group.id",kafkaGroup); FlinkKafkaConsumer011<String> djConsumer = new FlinkKafkaConsumer011<>( kafkaTopic, new SimpleStringSchema(), properties); return djConsumer; }
The above-specified method takes a
The number 011 in the name of class refers to the Kafka version.
Kafka String Producer
To produce data to Kafka, we need to provide Kafka address and
public static FlinkKafkaProducer011<String> generateProducerForTopic( String kafkaTopic, String kafkaAddress){ return new FlinkKafkaProducer011<>(kafkaAddress, kafkaTopic, new SimpleStringSchema()); }
The above-specified method takes only two parameters named
String Stream Processing
Now that we have a fully functional consumer and producer methods ready, let’s try to process data from Kafka and then save the generated output result back to Kafka.
In this example, we’re going to convert each word in lowercase for each Kafka message entry and then write it back to Kafka.
For this purpose we need to create a custom
public class ConvertToLowerCase implements MapFunction<String, String> { @Override public String map(String s) { return s.toLowerCase(); } }
After creating the function, we can use it in stream processing:
public static void main( String[] args ) { String inTopic = "dj_in"; String outTopic = "dj_out"; String consumerGroup = "dj_group"; String kafkaAddress = "localhost:9092"; try { StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); FlinkKafkaConsumer011<String> flinkKafkaConsumer = generateConsumerForTopic(inTopic, kafkaAddress, consumerGroup); DataStream<String> stringInputStream = environment.addSource(flinkKafkaConsumer); FlinkKafkaProducer011<String> flinkKafkaProducer = generateProducerForTopic(outTopic, kafkaAddress); stringInputStream .map(new ConvertToLowerCase()) .addSink(flinkKafkaProducer); environment.execute(); } catch(Exception e) { e.printStackTrace(); } }
The application main method will read data from dj_in
dj_out
topic in Kafka.