Powered By Blogger

Saturday, December 7, 2019

Kafka , broker, producer , consumer

kafka

Please download kafka in cloudera VM using the link:
https://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz


inside bin

zookeeper-server-start.sh ../config/zookeeper.properties



telnet localhost 2181

stat

./kafka-server-start.sh ../config/server.properties


./kafka-topics.sh --create --topic my_topic --zookeeper localhost:2181 --replication-factor 1 --partitions 1

./kafka-topics.sh --list --zookeeper localhost:2181

./kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic

./kafka-console-consumer.sh --zookeeper localhost:2181 --topic my_topic --from-beginning


cd /tmp/kafka-logs

./kafka-topics.sh --create --topic my_topic --zookeeper localhost:2181 --replication-factor 1 --partitions 3

./kafka-topics.sh --create --topic my_topic --zookeeper localhost:2181 --replication-factor 3 --partitions 3

./kafka-topics.sh --describe --topic my_topic --zookeeper localhost:2181

==============================================================================================================================

Each machine is called as broker.
We can have multiple brokers in a machine.


topic  : twitterTopic it will have partitions.
partition :


Produces will produce messages.  partition will be stored in broker.
If we have 3 node broker and 3 partitions, each partition will be stored in the broker.


We will have the replication factor, to store the same partition in different broker.

zookeeper : decides who is master, and keeps track of all the nodes.


==============================================================================================================================
cd /Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/config

zookeeper-server-start.sh ../config/zookeeper.properties

(base) basanmachine:config basan$ cat server.properties

log.retention.hours=168
log.retention.check.interval.ms=300000



broker.id=0
# Each broker should have different name.
listeners=PLAINTEXT://:9092 - where kafka broker shpould run by default.

log.dirs=/tmp/kafka-logs # where to put the messages.

==============================================================================================================================
start the broker

(base) basanmachine:bin basan$ ./kafka-server-start.sh ../config/server.prop

on the successful start we will get below message

[2019-12-07 17:07:17,615] INFO Kafka commitId : fdcf75ea326b8e07 (org.apache.kafka.common.utils.AppInfoParser)
[2019-12-07 17:07:17,617] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

==============================================================================================================================

creating topic

(base) basanmachine:bin basan$ pwd
/Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/bin
./kafka-topics.sh --create --topic my_topic --zookeeper localhost:2181 --replication-factor 1 --partitions 1


(base) basanmachine:bin basan$ ./kafka-topics.sh --create --topic my_topic --zookeeper localhost:2181 --replication-factor 1 --partitions 1
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic "my_topic".
(base) basanmachine:bin basan$

==============================================================================================================================
listing the topic

(base) basanmachine:bin basan$ ./kafka-topics.sh --list --zookeeper localhost:2181
my_topic
(base) basanmachine:bin basan$

==============================================================================================================================

producing messages to kafka
(base) basanmachine:bin basan$ pwd
/Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/bin
(base) basanmachine:bin basan$

./kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic


==============================================================================================================================

consuming messages from kafka

./kafka-console-consumer.sh --zookeeper localhost:2181 --topic my_topic --from-beginning


base) basanmachine:~ basan$ cd /Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/bin
(base) basanmachine:bin basan$
(base) basanmachine:bin basan$ ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic my_topic --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].

==============================================================================================================================

checking stored content

(base) basanmachine:kafka-logs basan$ ls
cleaner-offset-checkpoint
log-start-offset-checkpoint
meta.properties
recovery-point-offset-checkpoint
replication-offset-checkpoint


==============================================================================================================================

rabbit MQ vs Kafka

As soon as the message is read  from MQ the message will be gone but in kafka we will have message untill it gets
expired.

Its same with ActiveMQ


==============================================================================================================================

creating 3 brokers, we have to change the broker port and the log location

/Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/config
(base) basanmachine:config basan$ pwd
/Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/config
(base) basanmachine:config basan$ cp server.properties server2.properties
(base) basanmachine:config basan$ cp server.properties server3.properties
(base) basanmachine:config basan$vi server2.properties

change the broker.id=2
listeners=PLAINTEXT://localhost:9093
log.dirs=/tmp/kafka-logs2


(base) basanmachine:config basan$ pwd
/Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/config
(base) basanmachine:config basan$vi server3.properties
broker.id=3
listeners=PLAINTEXT://localhost:9094
log.dirs=/tmp/kafka-logs3


(base) basanmachine:bin basan$ pwd
/Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/bin
start the brokers and zookeeper
./zookeeper-server-start.sh ../config/zookeeper.properties
./kafka-server-start.sh ../config/server.properties
./kafka-server-start.sh ../config/server2.properties
./kafka-server-start.sh ../config/server3.properties

[2019-12-07 17:44:42,822] INFO [ReplicaAlterLogDirsManager on broker 0] Added fetcher for partitions List() (kafka.server.ReplicaAlterLogDirsManager)

[2019-12-07 17:45:23,601] INFO Kafka commitId : fdcf75ea326b8e07 (org.apache.kafka.common.utils.AppInfoParser)
[2019-12-07 17:45:03,132] INFO Kafka commitId : fdcf75ea326b8e07 (org.apache.kafka.common.utils.AppInfoParser)
[2019-12-07 17:45:03,133] INFO [KafkaServer id=2] started (kafka.server.KafkaServer)

[2019-12-07 17:45:03,133] INFO [KafkaServer id=2] started (kafka.server.KafkaServer)


(base) basanmachine:~ basan$ cd /Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/bin
./kafka-topics.sh --create --topic multipletopic --zookeeper localhost:2181 --replication-factor 3 --partitions 3

./kafka-topics.sh --describe --topic multipletopic --zookeeper localhost:2181



(base) basanmachine:bin basan$ ./kafka-topics.sh --create --topic multipletopic --zookeeper localhost:2181 --replication-factor 3 --partitions 3
Created topic "multipletopic".
(base) basanmachine:bin basan$


(base) basanmachine:bin basan$ ./kafka-topics.sh --describe --topic multipletopic --zookeeper localhost:2181
Topic:multipletopic PartitionCount:3 ReplicationFactor:3 Configs:
Topic: multipletopic Partition: 0 Leader: 0 Replicas: 0,3,2 Isr: 0,3,2
Topic: multipletopic Partition: 1 Leader: 2 Replicas: 2,0,3 Isr: 2,0,3
Topic: multipletopic Partition: 2 Leader: 3 Replicas: 3,2,0 Isr: 3,2,0
(base) basanmachine:bin basan$

Observe the below folders multipletopic-0,multipletopic-1,multipletopic-2, since we have 3 replication


(base) basanmachine:bin basan$ cd /tmp/kafka-logs
(base) basanmachine:kafka-logs basan$ ls
cleaner-offset-checkpoint
log-start-offset-checkpoint
meta.properties
multipletopic-0
multipletopic-1
multipletopic-2
my_topic-0
recovery-point-offset-checkpoint
replication-offset-checkpoint
(base) basanmachine:kafka-logs basan$


(base) basanmachine:multipletopic-0 basan$ pwd
/tmp/kafka-logs/multipletopic-0
(base) basanmachine:multipletopic-0 basan$ ls
00000000000000000000.index 00000000000000000000.timeindex
00000000000000000000.log leader-epoch-checkpoint
(base) basanmachine:multipletopic-0 basan$


========================================================================================================

If we briing any broker in sync replica will come down which can be seen by checking describe command


(base) basanmachine:bin basan$ ./kafka-topics.sh --describe --topic multipletopic --zookeeper localhost:2181
Topic:multipletopic PartitionCount:3 ReplicationFactor:3 Configs:
Topic: multipletopic Partition: 0 Leader: 0 Replicas: 0,3,2 Isr: 0,2
Topic: multipletopic Partition: 1 Leader: 2 Replicas: 2,0,3 Isr: 2,0
Topic: multipletopic Partition: 2 Leader: 2 Replicas: 3,2,0 Isr: 2,0

    Now I will restart the broker again in sync replica will be changed.


    (base) basanmachine:bin basan$ ./kafka-server-start.sh ../config/server3.properties


(base) basanmachine:bin basan$ ./kafka-topics.sh --describe --topic multipletopic --zookeeper localhost:2181
Topic:multipletopic PartitionCount:3 ReplicationFactor:3 Configs:
Topic: multipletopic Partition: 0 Leader: 0 Replicas: 0,3,2 Isr: 0,2,3
Topic: multipletopic Partition: 1 Leader: 2 Replicas: 2,0,3 Isr: 2,0,3
Topic: multipletopic Partition: 2 Leader: 2 Replicas: 3,2,0 Isr: 2,0,3
(base) basanmachine:bin basan$

Observe again in sync replica came up again


========================================================================================================

/Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/bin

./kafka-topics.sh --create --topic basan --zookeeper localhost:2181 --replication-factor 1 --partitions 1

./kafka-topics.sh --list --zookeeper localhost:2181

./kafka-console-producer.sh --broker-list localhost:9092 --topic basan

./kafka-console-consumer.sh --zookeeper localhost:2181 --topic basan --from-beginning



(base) basanmachine:bin basan$ ./kafka-topics.sh --create --topic basan --zookeeper localhost:2181 --replication-factor 1 --partitions 1
Created topic "basan".
(base) basanmachine:bin basan$ ./kafka-topics.sh --list --zookeeper localhost:2181
basan
multipletopic
my_topic
(base) basanmachine:bin basan$


(base) basanmachine:bin basan$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic basan
>basan
test
test2


./kafka-console-consumer.sh --zookeeper localhost:2181 --topic basan --from-beginning



(base) basanmachine:~ basan$ cd /Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/bin
-bash: cd: /Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/bin: No such file or directory
(base) basanmachine:~ basan$ /Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/bin
-bash: /Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/bin: is a directory
(base) basanmachine:~ basan$ cd /Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/bin
(base) basanmachine:bin basan$
(base) basanmachine:bin basan$ ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic basan --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
basan
test
test2

No comments:

Post a Comment