Step by Step of Installing Apache Kafka and Communicating with Spark

Hi Guys,

Till now, We have learned Yarn, hadoop, and mainly focused on Spark and practise several of Machine learning Algorithms either with Scikit-learn Packages in Python or with MLlib in PySpark. Today, let’s take a break from Spark and MLlib and learn something with Apache Kafka.

I. Background

Mainly, Apache Kafka is distributed, partitioned, replicated and real time commit log service. It provides the functionality of a messaging system, but with a unique design. Here’s the Linkedin Kafka paper

1. Here’s some very basic concepts you need to understand

  • A stream of messages of a particular type is defined as a topic. A Messagei s defined as a payload of bytes and a Topic is a category or feed name to which messages are published.
  • A Producer can be anyone who can publish messages to a Topic.
  • The published messages are then stored at a set of servers called Brokers or Kafka Cluster.
  • A Consumer can subscribe to one or more Topics and consume the published Messages by pulling data from the Brokers.

2. Usage of Zookepper in Kafka: As for coordination and facilitation of distributed system ZooKeeper is used, for the same reason Kafka is using it. ZooKeeper is used for managing, coordinating Kafka broker. Notice, in Hadoop ecosystem, Zookeeper is also used for cluster management for Hadoop. Thus, we have to say Zookeeper is mainly solving the problem of reliable distributed coordination.

II. Install the Apache Kafka on Ubnutu

Basically, the following procedure is based on this youtube lecture with my comment and confusion I met when I tries this by myself.

Env versions

Ubuntu 15.04

Apache Kafka-2.10.8.2.0

1. Download and un-zip the kafka tar file from here 

tar -zxvf  kafka_2.10-0.8.2.0.tgz
mv kafka_2.10-0.8.2.0 /usr/local/kafka

2. Start Zookeeper

Zookeeper is complicated when installing manually so that Kafka has it’s server stop and start shell integrated.

Start zookeeper server command:

bin/zookeeper-server-start.sh config/zookeeper.properties

1

Note: if you are stucking at the last statment “INFO binding to port 0.0.0.0/0.0.0.0:2181” don’t worry. This means the zookeeper server starts correctly and waiting for your next move.

3. Start Kafka Broker

We can open another terminal and start the broker.

bin/kafka-server-start.sh config/server.properties

2

If the terminal hangs at “INFO new leader is 0”, the Kafka broker/server is started correctly. And you will see the connection is at port 9092.

4. Topic Creation

Let’s create a topic named “test” in another terminal

bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic test

7

5. List the topics

bin/kafka-topics.sh –list –zookeeper localhost:2181

8

Since we have created only one topic, it will show up “test”

6. Start the Producer

bin/kafka-console-producer.sh –broker-list localhost:9092 –topic test

9

Note: the localhost port is 9092 which is the port we created for Kafka broker.

Now producer is waiting for your move.

7. Start the Consumer

We can start another terminal to kick off the consumer

bin/kafka-console-consumer.sh –zookeeper localhost:2181 –topic test –from-beginning

10

Note: the localhost of consumer is 2181 which is the port zookeeper server runs on.

8. Interactively and real-timely testing

You can type whatever text you want at the producer terminal and press “Enter” when you finished. The same message will be deliver to the consumer terminal. Isn’t it fun?!

11

9. Try WordCount Streaming example with Spark

9.1 open a new terminal and go to your spark directory

cd /usr/local/spark

9.2 In Spark run command shown below.

“test” is the topic, “4” is the number of threads (about Kafka consumer thread, please refer to this). Normally, each consumer will get one thread. (Code is at here)

bin/runexample org.apache.spark.examples.streaming.KafkaWordCount localhost:2181 testgroup test 4
9.3 Here’s the final result of Kafka Producer communicate with Spark is shown below.
It is clearly showing that the spark is working in streaming processing with the input at the Kafka producer side and word-counting the input words.
8
Congratulations! Now you know how to install the Apache kafka and even about communicating with Apache Spark!
Reference

1. http://www.infoq.com/articles/apache-kafka

2. http://tech.lalitbhatt.net/2014/07/apache-kafka-tutorial.html

3. http://www.sparkfu.com/2015/01/spark-kafka-cassandra-part-1.html

9 comments

  1. Pingback: Apache Kafka Installation | I M Saravanan's Blog
  2. Pingback: Apache Kafka Installation ← Team Qualite
  3. cheriat · March 9, 2016

    Thank you for this post. It’s very interesting.
    Everything went well except the last part Spark.
    When I begin to write words in the Kafka source, Spark shows me the following error message.

    Nb: I use Spark 1.6.0, 3.4.8-kafka_2.11-0.9.0.1 and zookeeper.

    Do you have any idea ?

    Exception in thread “main” java.io.FileNotFoundException: File checkpoint does not exist
    at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:534)
    at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:747)
    at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:524)
    at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:409)
    at org.apache.spark.streaming.StreamingContext.checkpoint(StreamingContext.scala:236)
    at org.apache.spark.examples.streaming.KafkaWordCount$.main(KafkaWordCount.scala:54)
    at org.apache.spark.examples.streaming.KafkaWordCount.main(KafkaWordCount.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

    Like

  4. Pingback: Kafaka instatalling by chongyaorbin – hadoopminds
  5. Aamir · September 7, 2016

    Great post. But I dont understand the meaning why another producer is being added in the program. Your producer is already sending messages from the terminal

    Like

    • cyrobin · September 8, 2016

      Thanks. That one is for another example

      Like

  6. Pingback: Kafka installation Guide in ubutnu | Hadoop Reference Links
  7. Pingback: Kafka installation Guide by Robin | HadoopMinds
  8. Apache Kafka Tutorial · November 12, 2017

    Good Kafka Tutorial.

    Like

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s