Please download kafka in cloudera VM using the link:
inside bin
zookeeper-server-start.sh ../config/zookeeper.properties
telnet localhost 2181
./kafka-server-start.sh ../config/server.properties
./kafka-topics.sh --create --topic my_topic --zookeeper localhost:2181 --replication-factor 1 --partitions 1
./kafka-topics.sh --list --zookeeper localhost:2181
./kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic my_topic --from-beginning
cd /tmp/kafka-logs
./kafka-topics.sh --create --topic my_topic --zookeeper localhost:2181 --replication-factor 1 --partitions 3
./kafka-topics.sh --create --topic my_topic --zookeeper localhost:2181 --replication-factor 3 --partitions 3
./kafka-topics.sh --describe --topic my_topic --zookeeper localhost:2181
Each machine is called as broker.
We can have multiple brokers in a machine.
topic : twitterTopic it will have partitions.
partition :
Produces will produce messages. partition will be stored in broker.
If we have 3 node broker and 3 partitions, each partition will be stored in the broker.
We will have the replication factor, to store the same partition in different broker.
zookeeper : decides who is master, and keeps track of all the nodes.
cd /Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/config
zookeeper-server-start.sh ../config/zookeeper.properties
(base) basanmachine:config basan$ cat server.properties
# Each broker should have different name.
listeners=PLAINTEXT://:9092 - where kafka broker shpould run by default.
log.dirs=/tmp/kafka-logs # where to put the messages.
start the broker
(base) basanmachine:bin basan$ ./kafka-server-start.sh ../config/server.prop
on the successful start we will get below message
[2019-12-07 17:07:17,615] INFO Kafka commitId : fdcf75ea326b8e07 (org.apache.kafka.common.utils.AppInfoParser)
[2019-12-07 17:07:17,617] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
creating topic
(base) basanmachine:bin basan$ pwd
./kafka-topics.sh --create --topic my_topic --zookeeper localhost:2181 --replication-factor 1 --partitions 1
(base) basanmachine:bin basan$ ./kafka-topics.sh --create --topic my_topic --zookeeper localhost:2181 --replication-factor 1 --partitions 1
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic "my_topic".
(base) basanmachine:bin basan$
listing the topic
(base) basanmachine:bin basan$ ./kafka-topics.sh --list --zookeeper localhost:2181
(base) basanmachine:bin basan$
producing messages to kafka
(base) basanmachine:bin basan$ pwd
(base) basanmachine:bin basan$
./kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic
consuming messages from kafka
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic my_topic --from-beginning
base) basanmachine:~ basan$ cd /Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/bin
(base) basanmachine:bin basan$
(base) basanmachine:bin basan$ ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic my_topic --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
checking stored content
(base) basanmachine:kafka-logs basan$ ls
rabbit MQ vs Kafka
As soon as the message is read from MQ the message will be gone but in kafka we will have message untill it gets
Its same with ActiveMQ
creating 3 brokers, we have to change the broker port and the log location
(base) basanmachine:config basan$ pwd
(base) basanmachine:config basan$ cp server.properties server2.properties
(base) basanmachine:config basan$ cp server.properties server3.properties
(base) basanmachine:config basan$vi server2.properties
change the broker.id=2
(base) basanmachine:config basan$ pwd
(base) basanmachine:config basan$vi server3.properties
(base) basanmachine:bin basan$ pwd
start the brokers and zookeeper
./zookeeper-server-start.sh ../config/zookeeper.properties
./kafka-server-start.sh ../config/server.properties
./kafka-server-start.sh ../config/server2.properties
./kafka-server-start.sh ../config/server3.properties
[2019-12-07 17:44:42,822] INFO [ReplicaAlterLogDirsManager on broker 0] Added fetcher for partitions List() (kafka.server.ReplicaAlterLogDirsManager)
[2019-12-07 17:45:23,601] INFO Kafka commitId : fdcf75ea326b8e07 (org.apache.kafka.common.utils.AppInfoParser)
[2019-12-07 17:45:03,132] INFO Kafka commitId : fdcf75ea326b8e07 (org.apache.kafka.common.utils.AppInfoParser)
[2019-12-07 17:45:03,133] INFO [KafkaServer id=2] started (kafka.server.KafkaServer)
[2019-12-07 17:45:03,133] INFO [KafkaServer id=2] started (kafka.server.KafkaServer)
(base) basanmachine:~ basan$ cd /Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/bin
./kafka-topics.sh --create --topic multipletopic --zookeeper localhost:2181 --replication-factor 3 --partitions 3
./kafka-topics.sh --describe --topic multipletopic --zookeeper localhost:2181
(base) basanmachine:bin basan$ ./kafka-topics.sh --create --topic multipletopic --zookeeper localhost:2181 --replication-factor 3 --partitions 3
Created topic "multipletopic".
(base) basanmachine:bin basan$
(base) basanmachine:bin basan$ ./kafka-topics.sh --describe --topic multipletopic --zookeeper localhost:2181
Topic:multipletopic PartitionCount:3 ReplicationFactor:3 Configs:
Topic: multipletopic Partition: 0 Leader: 0 Replicas: 0,3,2 Isr: 0,3,2
Topic: multipletopic Partition: 1 Leader: 2 Replicas: 2,0,3 Isr: 2,0,3
Topic: multipletopic Partition: 2 Leader: 3 Replicas: 3,2,0 Isr: 3,2,0
(base) basanmachine:bin basan$
Observe the below folders multipletopic-0,multipletopic-1,multipletopic-2, since we have 3 replication
(base) basanmachine:bin basan$ cd /tmp/kafka-logs
(base) basanmachine:kafka-logs basan$ ls
(base) basanmachine:kafka-logs basan$
(base) basanmachine:multipletopic-0 basan$ pwd
(base) basanmachine:multipletopic-0 basan$ ls
00000000000000000000.index 00000000000000000000.timeindex
00000000000000000000.log leader-epoch-checkpoint
(base) basanmachine:multipletopic-0 basan$
If we briing any broker in sync replica will come down which can be seen by checking describe command
(base) basanmachine:bin basan$ ./kafka-topics.sh --describe --topic multipletopic --zookeeper localhost:2181
Topic:multipletopic PartitionCount:3 ReplicationFactor:3 Configs:
Topic: multipletopic Partition: 0 Leader: 0 Replicas: 0,3,2 Isr: 0,2
Topic: multipletopic Partition: 1 Leader: 2 Replicas: 2,0,3 Isr: 2,0
Topic: multipletopic Partition: 2 Leader: 2 Replicas: 3,2,0 Isr: 2,0
Now I will restart the broker again in sync replica will be changed.
(base) basanmachine:bin basan$ ./kafka-server-start.sh ../config/server3.properties
(base) basanmachine:bin basan$ ./kafka-topics.sh --describe --topic multipletopic --zookeeper localhost:2181
Topic:multipletopic PartitionCount:3 ReplicationFactor:3 Configs:
Topic: multipletopic Partition: 0 Leader: 0 Replicas: 0,3,2 Isr: 0,2,3
Topic: multipletopic Partition: 1 Leader: 2 Replicas: 2,0,3 Isr: 2,0,3
Topic: multipletopic Partition: 2 Leader: 2 Replicas: 3,2,0 Isr: 2,0,3
(base) basanmachine:bin basan$
Observe again in sync replica came up again
./kafka-topics.sh --create --topic basan --zookeeper localhost:2181 --replication-factor 1 --partitions 1
./kafka-topics.sh --list --zookeeper localhost:2181
./kafka-console-producer.sh --broker-list localhost:9092 --topic basan
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic basan --from-beginning
(base) basanmachine:bin basan$ ./kafka-topics.sh --create --topic basan --zookeeper localhost:2181 --replication-factor 1 --partitions 1
Created topic "basan".
(base) basanmachine:bin basan$ ./kafka-topics.sh --list --zookeeper localhost:2181
(base) basanmachine:bin basan$
(base) basanmachine:bin basan$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic basan
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic basan --from-beginning
(base) basanmachine:~ basan$ cd /Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/bin
-bash: cd: /Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/bin: No such file or directory
(base) basanmachine:~ basan$ /Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/bin
-bash: /Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/bin: is a directory
(base) basanmachine:~ basan$ cd /Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/bin
(base) basanmachine:bin basan$
(base) basanmachine:bin basan$ ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic basan --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
Please download kafka in cloudera VM using the link:
inside bin
zookeeper-server-start.sh ../config/zookeeper.properties
telnet localhost 2181
./kafka-server-start.sh ../config/server.properties
./kafka-topics.sh --create --topic my_topic --zookeeper localhost:2181 --replication-factor 1 --partitions 1
./kafka-topics.sh --list --zookeeper localhost:2181
./kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic my_topic --from-beginning
cd /tmp/kafka-logs
./kafka-topics.sh --create --topic my_topic --zookeeper localhost:2181 --replication-factor 1 --partitions 3
./kafka-topics.sh --create --topic my_topic --zookeeper localhost:2181 --replication-factor 3 --partitions 3
./kafka-topics.sh --describe --topic my_topic --zookeeper localhost:2181
Each machine is called as broker.
We can have multiple brokers in a machine.
topic : twitterTopic it will have partitions.
partition :
Produces will produce messages. partition will be stored in broker.
If we have 3 node broker and 3 partitions, each partition will be stored in the broker.
We will have the replication factor, to store the same partition in different broker.
zookeeper : decides who is master, and keeps track of all the nodes.
cd /Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/config
zookeeper-server-start.sh ../config/zookeeper.properties
(base) basanmachine:config basan$ cat server.properties
# Each broker should have different name.
listeners=PLAINTEXT://:9092 - where kafka broker shpould run by default.
log.dirs=/tmp/kafka-logs # where to put the messages.
start the broker
(base) basanmachine:bin basan$ ./kafka-server-start.sh ../config/server.prop
on the successful start we will get below message
[2019-12-07 17:07:17,615] INFO Kafka commitId : fdcf75ea326b8e07 (org.apache.kafka.common.utils.AppInfoParser)
[2019-12-07 17:07:17,617] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
creating topic
(base) basanmachine:bin basan$ pwd
./kafka-topics.sh --create --topic my_topic --zookeeper localhost:2181 --replication-factor 1 --partitions 1
(base) basanmachine:bin basan$ ./kafka-topics.sh --create --topic my_topic --zookeeper localhost:2181 --replication-factor 1 --partitions 1
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic "my_topic".
(base) basanmachine:bin basan$
listing the topic
(base) basanmachine:bin basan$ ./kafka-topics.sh --list --zookeeper localhost:2181
(base) basanmachine:bin basan$
producing messages to kafka
(base) basanmachine:bin basan$ pwd
(base) basanmachine:bin basan$
./kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic
consuming messages from kafka
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic my_topic --from-beginning
base) basanmachine:~ basan$ cd /Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/bin
(base) basanmachine:bin basan$
(base) basanmachine:bin basan$ ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic my_topic --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
checking stored content
(base) basanmachine:kafka-logs basan$ ls
rabbit MQ vs Kafka
As soon as the message is read from MQ the message will be gone but in kafka we will have message untill it gets
Its same with ActiveMQ
creating 3 brokers, we have to change the broker port and the log location
(base) basanmachine:config basan$ pwd
(base) basanmachine:config basan$ cp server.properties server2.properties
(base) basanmachine:config basan$ cp server.properties server3.properties
(base) basanmachine:config basan$vi server2.properties
change the broker.id=2
(base) basanmachine:config basan$ pwd
(base) basanmachine:config basan$vi server3.properties
(base) basanmachine:bin basan$ pwd
start the brokers and zookeeper
./zookeeper-server-start.sh ../config/zookeeper.properties
./kafka-server-start.sh ../config/server.properties
./kafka-server-start.sh ../config/server2.properties
./kafka-server-start.sh ../config/server3.properties
[2019-12-07 17:44:42,822] INFO [ReplicaAlterLogDirsManager on broker 0] Added fetcher for partitions List() (kafka.server.ReplicaAlterLogDirsManager)
[2019-12-07 17:45:23,601] INFO Kafka commitId : fdcf75ea326b8e07 (org.apache.kafka.common.utils.AppInfoParser)
[2019-12-07 17:45:03,132] INFO Kafka commitId : fdcf75ea326b8e07 (org.apache.kafka.common.utils.AppInfoParser)
[2019-12-07 17:45:03,133] INFO [KafkaServer id=2] started (kafka.server.KafkaServer)
[2019-12-07 17:45:03,133] INFO [KafkaServer id=2] started (kafka.server.KafkaServer)
(base) basanmachine:~ basan$ cd /Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/bin
./kafka-topics.sh --create --topic multipletopic --zookeeper localhost:2181 --replication-factor 3 --partitions 3
./kafka-topics.sh --describe --topic multipletopic --zookeeper localhost:2181
(base) basanmachine:bin basan$ ./kafka-topics.sh --create --topic multipletopic --zookeeper localhost:2181 --replication-factor 3 --partitions 3
Created topic "multipletopic".
(base) basanmachine:bin basan$
(base) basanmachine:bin basan$ ./kafka-topics.sh --describe --topic multipletopic --zookeeper localhost:2181
Topic:multipletopic PartitionCount:3 ReplicationFactor:3 Configs:
Topic: multipletopic Partition: 0 Leader: 0 Replicas: 0,3,2 Isr: 0,3,2
Topic: multipletopic Partition: 1 Leader: 2 Replicas: 2,0,3 Isr: 2,0,3
Topic: multipletopic Partition: 2 Leader: 3 Replicas: 3,2,0 Isr: 3,2,0
(base) basanmachine:bin basan$
Observe the below folders multipletopic-0,multipletopic-1,multipletopic-2, since we have 3 replication
(base) basanmachine:bin basan$ cd /tmp/kafka-logs
(base) basanmachine:kafka-logs basan$ ls
(base) basanmachine:kafka-logs basan$
(base) basanmachine:multipletopic-0 basan$ pwd
(base) basanmachine:multipletopic-0 basan$ ls
00000000000000000000.index 00000000000000000000.timeindex
00000000000000000000.log leader-epoch-checkpoint
(base) basanmachine:multipletopic-0 basan$
If we briing any broker in sync replica will come down which can be seen by checking describe command
(base) basanmachine:bin basan$ ./kafka-topics.sh --describe --topic multipletopic --zookeeper localhost:2181
Topic:multipletopic PartitionCount:3 ReplicationFactor:3 Configs:
Topic: multipletopic Partition: 0 Leader: 0 Replicas: 0,3,2 Isr: 0,2
Topic: multipletopic Partition: 1 Leader: 2 Replicas: 2,0,3 Isr: 2,0
Topic: multipletopic Partition: 2 Leader: 2 Replicas: 3,2,0 Isr: 2,0
Now I will restart the broker again in sync replica will be changed.
(base) basanmachine:bin basan$ ./kafka-server-start.sh ../config/server3.properties
(base) basanmachine:bin basan$ ./kafka-topics.sh --describe --topic multipletopic --zookeeper localhost:2181
Topic:multipletopic PartitionCount:3 ReplicationFactor:3 Configs:
Topic: multipletopic Partition: 0 Leader: 0 Replicas: 0,3,2 Isr: 0,2,3
Topic: multipletopic Partition: 1 Leader: 2 Replicas: 2,0,3 Isr: 2,0,3
Topic: multipletopic Partition: 2 Leader: 2 Replicas: 3,2,0 Isr: 2,0,3
(base) basanmachine:bin basan$
Observe again in sync replica came up again
./kafka-topics.sh --create --topic basan --zookeeper localhost:2181 --replication-factor 1 --partitions 1
./kafka-topics.sh --list --zookeeper localhost:2181
./kafka-console-producer.sh --broker-list localhost:9092 --topic basan
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic basan --from-beginning
(base) basanmachine:bin basan$ ./kafka-topics.sh --create --topic basan --zookeeper localhost:2181 --replication-factor 1 --partitions 1
Created topic "basan".
(base) basanmachine:bin basan$ ./kafka-topics.sh --list --zookeeper localhost:2181
(base) basanmachine:bin basan$
(base) basanmachine:bin basan$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic basan
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic basan --from-beginning
(base) basanmachine:~ basan$ cd /Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/bin
-bash: cd: /Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/bin: No such file or directory
(base) basanmachine:~ basan$ /Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/bin
-bash: /Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/bin: is a directory
(base) basanmachine:~ basan$ cd /Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/bin
(base) basanmachine:bin basan$
(base) basanmachine:bin basan$ ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic basan --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
No comments:
Post a Comment