Powered By Blogger

Tuesday, December 17, 2019

Ordering of Columns based on table name

def getOrderedDFBasedOnTableStructure(inputDF:DataFrame, tableName :String,spark:SparkSession): Unit ={
  val columnNames = spark.table(tableName).columns
  logInfo(s"table $tableName is having columns in this $columnNames")
  val orderedDF = inputDF.select(columnNames.head, columnNames.tail: _*)
  orderedDF}

Saturday, December 7, 2019

Kafka , broker, producer , consumer

kafka

Please download kafka in cloudera VM using the link:
https://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz


inside bin

zookeeper-server-start.sh ../config/zookeeper.properties



telnet localhost 2181

stat

./kafka-server-start.sh ../config/server.properties


./kafka-topics.sh --create --topic my_topic --zookeeper localhost:2181 --replication-factor 1 --partitions 1

./kafka-topics.sh --list --zookeeper localhost:2181

./kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic

./kafka-console-consumer.sh --zookeeper localhost:2181 --topic my_topic --from-beginning


cd /tmp/kafka-logs

./kafka-topics.sh --create --topic my_topic --zookeeper localhost:2181 --replication-factor 1 --partitions 3

./kafka-topics.sh --create --topic my_topic --zookeeper localhost:2181 --replication-factor 3 --partitions 3

./kafka-topics.sh --describe --topic my_topic --zookeeper localhost:2181

==============================================================================================================================

Each machine is called as broker.
We can have multiple brokers in a machine.


topic  : twitterTopic it will have partitions.
partition :


Produces will produce messages.  partition will be stored in broker.
If we have 3 node broker and 3 partitions, each partition will be stored in the broker.


We will have the replication factor, to store the same partition in different broker.

zookeeper : decides who is master, and keeps track of all the nodes.


==============================================================================================================================
cd /Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/config

zookeeper-server-start.sh ../config/zookeeper.properties

(base) basanmachine:config basan$ cat server.properties

log.retention.hours=168
log.retention.check.interval.ms=300000



broker.id=0
# Each broker should have different name.
listeners=PLAINTEXT://:9092 - where kafka broker shpould run by default.

log.dirs=/tmp/kafka-logs # where to put the messages.

==============================================================================================================================
start the broker

(base) basanmachine:bin basan$ ./kafka-server-start.sh ../config/server.prop

on the successful start we will get below message

[2019-12-07 17:07:17,615] INFO Kafka commitId : fdcf75ea326b8e07 (org.apache.kafka.common.utils.AppInfoParser)
[2019-12-07 17:07:17,617] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

==============================================================================================================================

creating topic

(base) basanmachine:bin basan$ pwd
/Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/bin
./kafka-topics.sh --create --topic my_topic --zookeeper localhost:2181 --replication-factor 1 --partitions 1


(base) basanmachine:bin basan$ ./kafka-topics.sh --create --topic my_topic --zookeeper localhost:2181 --replication-factor 1 --partitions 1
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic "my_topic".
(base) basanmachine:bin basan$

==============================================================================================================================
listing the topic

(base) basanmachine:bin basan$ ./kafka-topics.sh --list --zookeeper localhost:2181
my_topic
(base) basanmachine:bin basan$

==============================================================================================================================

producing messages to kafka
(base) basanmachine:bin basan$ pwd
/Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/bin
(base) basanmachine:bin basan$

./kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic


==============================================================================================================================

consuming messages from kafka

./kafka-console-consumer.sh --zookeeper localhost:2181 --topic my_topic --from-beginning


base) basanmachine:~ basan$ cd /Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/bin
(base) basanmachine:bin basan$
(base) basanmachine:bin basan$ ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic my_topic --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].

==============================================================================================================================

checking stored content

(base) basanmachine:kafka-logs basan$ ls
cleaner-offset-checkpoint
log-start-offset-checkpoint
meta.properties
recovery-point-offset-checkpoint
replication-offset-checkpoint


==============================================================================================================================

rabbit MQ vs Kafka

As soon as the message is read  from MQ the message will be gone but in kafka we will have message untill it gets
expired.

Its same with ActiveMQ


==============================================================================================================================

creating 3 brokers, we have to change the broker port and the log location

/Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/config
(base) basanmachine:config basan$ pwd
/Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/config
(base) basanmachine:config basan$ cp server.properties server2.properties
(base) basanmachine:config basan$ cp server.properties server3.properties
(base) basanmachine:config basan$vi server2.properties

change the broker.id=2
listeners=PLAINTEXT://localhost:9093
log.dirs=/tmp/kafka-logs2


(base) basanmachine:config basan$ pwd
/Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/config
(base) basanmachine:config basan$vi server3.properties
broker.id=3
listeners=PLAINTEXT://localhost:9094
log.dirs=/tmp/kafka-logs3


(base) basanmachine:bin basan$ pwd
/Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/bin
start the brokers and zookeeper
./zookeeper-server-start.sh ../config/zookeeper.properties
./kafka-server-start.sh ../config/server.properties
./kafka-server-start.sh ../config/server2.properties
./kafka-server-start.sh ../config/server3.properties

[2019-12-07 17:44:42,822] INFO [ReplicaAlterLogDirsManager on broker 0] Added fetcher for partitions List() (kafka.server.ReplicaAlterLogDirsManager)

[2019-12-07 17:45:23,601] INFO Kafka commitId : fdcf75ea326b8e07 (org.apache.kafka.common.utils.AppInfoParser)
[2019-12-07 17:45:03,132] INFO Kafka commitId : fdcf75ea326b8e07 (org.apache.kafka.common.utils.AppInfoParser)
[2019-12-07 17:45:03,133] INFO [KafkaServer id=2] started (kafka.server.KafkaServer)

[2019-12-07 17:45:03,133] INFO [KafkaServer id=2] started (kafka.server.KafkaServer)


(base) basanmachine:~ basan$ cd /Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/bin
./kafka-topics.sh --create --topic multipletopic --zookeeper localhost:2181 --replication-factor 3 --partitions 3

./kafka-topics.sh --describe --topic multipletopic --zookeeper localhost:2181



(base) basanmachine:bin basan$ ./kafka-topics.sh --create --topic multipletopic --zookeeper localhost:2181 --replication-factor 3 --partitions 3
Created topic "multipletopic".
(base) basanmachine:bin basan$


(base) basanmachine:bin basan$ ./kafka-topics.sh --describe --topic multipletopic --zookeeper localhost:2181
Topic:multipletopic PartitionCount:3 ReplicationFactor:3 Configs:
Topic: multipletopic Partition: 0 Leader: 0 Replicas: 0,3,2 Isr: 0,3,2
Topic: multipletopic Partition: 1 Leader: 2 Replicas: 2,0,3 Isr: 2,0,3
Topic: multipletopic Partition: 2 Leader: 3 Replicas: 3,2,0 Isr: 3,2,0
(base) basanmachine:bin basan$

Observe the below folders multipletopic-0,multipletopic-1,multipletopic-2, since we have 3 replication


(base) basanmachine:bin basan$ cd /tmp/kafka-logs
(base) basanmachine:kafka-logs basan$ ls
cleaner-offset-checkpoint
log-start-offset-checkpoint
meta.properties
multipletopic-0
multipletopic-1
multipletopic-2
my_topic-0
recovery-point-offset-checkpoint
replication-offset-checkpoint
(base) basanmachine:kafka-logs basan$


(base) basanmachine:multipletopic-0 basan$ pwd
/tmp/kafka-logs/multipletopic-0
(base) basanmachine:multipletopic-0 basan$ ls
00000000000000000000.index 00000000000000000000.timeindex
00000000000000000000.log leader-epoch-checkpoint
(base) basanmachine:multipletopic-0 basan$


========================================================================================================

If we briing any broker in sync replica will come down which can be seen by checking describe command


(base) basanmachine:bin basan$ ./kafka-topics.sh --describe --topic multipletopic --zookeeper localhost:2181
Topic:multipletopic PartitionCount:3 ReplicationFactor:3 Configs:
Topic: multipletopic Partition: 0 Leader: 0 Replicas: 0,3,2 Isr: 0,2
Topic: multipletopic Partition: 1 Leader: 2 Replicas: 2,0,3 Isr: 2,0
Topic: multipletopic Partition: 2 Leader: 2 Replicas: 3,2,0 Isr: 2,0

    Now I will restart the broker again in sync replica will be changed.


    (base) basanmachine:bin basan$ ./kafka-server-start.sh ../config/server3.properties


(base) basanmachine:bin basan$ ./kafka-topics.sh --describe --topic multipletopic --zookeeper localhost:2181
Topic:multipletopic PartitionCount:3 ReplicationFactor:3 Configs:
Topic: multipletopic Partition: 0 Leader: 0 Replicas: 0,3,2 Isr: 0,2,3
Topic: multipletopic Partition: 1 Leader: 2 Replicas: 2,0,3 Isr: 2,0,3
Topic: multipletopic Partition: 2 Leader: 2 Replicas: 3,2,0 Isr: 2,0,3
(base) basanmachine:bin basan$

Observe again in sync replica came up again


========================================================================================================

/Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/bin

./kafka-topics.sh --create --topic basan --zookeeper localhost:2181 --replication-factor 1 --partitions 1

./kafka-topics.sh --list --zookeeper localhost:2181

./kafka-console-producer.sh --broker-list localhost:9092 --topic basan

./kafka-console-consumer.sh --zookeeper localhost:2181 --topic basan --from-beginning



(base) basanmachine:bin basan$ ./kafka-topics.sh --create --topic basan --zookeeper localhost:2181 --replication-factor 1 --partitions 1
Created topic "basan".
(base) basanmachine:bin basan$ ./kafka-topics.sh --list --zookeeper localhost:2181
basan
multipletopic
my_topic
(base) basanmachine:bin basan$


(base) basanmachine:bin basan$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic basan
>basan
test
test2


./kafka-console-consumer.sh --zookeeper localhost:2181 --topic basan --from-beginning



(base) basanmachine:~ basan$ cd /Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/bin
-bash: cd: /Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/bin: No such file or directory
(base) basanmachine:~ basan$ /Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/bin
-bash: /Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/bin: is a directory
(base) basanmachine:~ basan$ cd /Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/bin
(base) basanmachine:bin basan$
(base) basanmachine:bin basan$ ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic basan --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
basan
test
test2

Spark Dstream

We will be processing continuous messages like kafka or Keinesis
Streaming data will be convered to RDD, by defining microbatch interval.

Flink is true streaming solution.


DStream will consist of set of RDD. Spark performas operations on RDD.

messages at certain interval will be aggregated, to form RDD. Set of RDD will create Dstream.

Within Dstreams, spark does batch processing on individual RDDs.


For Dstream we need to have StreamingContext. StreamingContext is built using sparkContext and the timeinterval.


Socket = ip +port

=========================

Below command can be used to produce messges
nc -lk 9999



(base) basanmachine:SparkProjects basan$ nc -lk 9999
test
test2
basan
patil
dummy producer basan


(base) basanmachine:~ $ spark-shell --master local[2]


cala> (base) basanmachine:~ basan$ spark-shell --master local[2]
2019-12-07 15:26:24 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://localhost:4040
Spark context available as 'sc' (master = local[2], app id = local-1575712589217).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.3.2
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_191)
Type in expressions to have them evaluated.
Type :help for more information.

scala>


import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
val ssc= new StreamingContext(sc, Seconds(5))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word =>(word,1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()




scala> import org.apache.spark._
import org.apache.spark._

scala> import org.apache.spark.streaming._
import org.apache.spark.streaming._

scala> import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.StreamingContext._

scala> val ssc= new StreamingContext(sc, Seconds(5))
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@350342e0

scala> val lines = ssc.socketTextStream("localhost", 9999)
lines: org.apache.spark.streaming.dstream.ReceiverInputDStream[String] = org.apache.spark.streaming.dstream.SocketInputDStream@53079ae6

scala> val words = lines.flatMap(_.split(" "))
words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@423762ae

scala> val pairs = words.map(word =>(word,1))
pairs: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.MappedDStream@4e9695cf

scala> val wordCounts = pairs.reduceByKey(_ + _)
wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@2351255a

scala> wordCounts.print()

scala>



scala> ssc.start()

scala> 2019-12-07 15:37:12 WARN  RandomBlockReplicationPolicy:66 - Expecting 1 replicas with only 0 peer/s.
2019-12-07 15:37:12 WARN  BlockManager:66 - Block input-0-1575713232400 replicated to only 0 peer(s) instead of 1 peers
-------------------------------------------
Time: 1575713235000 ms
-------------------------------------------
(test2,1)
(test,1)
(patil,1)
(basan,1)

-------------------------------------------
Time: 1575713240000 ms
-------------------------------------------

-------------------------------------------
Time: 1575713245000 ms
-------------------------------------------

2019-12-07 15:37:25 WARN  RandomBlockReplicationPolicy:66 - Expecting 1 replicas with only 0 peer/s.
2019-12-07 15:37:25 WARN  BlockManager:66 - Block input-0-1575713245200 replicated to only 0 peer(s) instead of 1 peers
-------------------------------------------
Time: 1575713250000 ms
-------------------------------------------
(test2,1)
(3,1)
(tet,1)

[Stage 0:>                                                          (0 + 1) / 1]2019-12-07 15:37:34 WARN  RandomBlockReplicationPolicy:66 - Expecting 1 replicas with only 0 peer/s.
2019-12-07 15:37:34 WARN  BlockManager:66 - Block input-0-1575713253800 replicated to only 0 peer(s) instead of 1 peers
-------------------------------------------
Time: 1575713255000 ms
-------------------------------------------
(basana,1)

[Stage 0:>                                                          (0 + 1) / 1]2019-12-07 15:37:39 WARN  RandomBlockReplicationPolicy:66 - Expecting 1 replicas with only 0 peer/s.
2019-12-07 15:37:39 WARN  BlockManager:66 - Block input-0-1575713259600 replicated to only 0 peer(s) instead of 1 peers
-------------------------------------------
Time: 1575713260000 ms
-------------------------------------------
(patil,1)
(basan,1)

-------------------------------------------

==================================================================================================================
pushing messages from socket
(base) basanmachine:SparkProjects basan$ nc -lk 9999
basan patil
test test2
test2 tet 3
basana
patil basan


==================================================================================================================
Observe the results, it just takes the aggregation of 5 seconds and not worried of previous results. This is the behaviour
in stateless transformation.


stateless transformation : For each RDD operation was applied and ignores the prvious results.

==================================================================================================================
statefull transformation :
We can get details from the starting of stream, or we can get few RDDS by defining window.

updateStateByKey(updateFn) - is the stateful function.

Pair RDD : tuple of elements.

==================================================================================================================

Window Operations :
Worried of last certail time interval.

summary function - consdering new ocming RDD operation
Inverse function - subtracting outgoing RDD operation

checkpoint is needed to  track the changes.

countByTwo - is the stateful operation

line.countByTwo(10, 2)

window size is 10 and 2 is the sliding interval.
Every 2 sec it will remove one old rdd and adds new rdd


sliding interval : has to be multiple of the Streaming Context inteval as it cannot read half RDD.

==================================================================================================================

If the producer is much faster compared to consumer then the data loss can happem in socketstream. Thats where buffering
mechanism like kafka.
All streaming solutions uses kafka.

TwitterStream can be the example.