Powered By Blogger

Saturday, December 7, 2019

Spark Dstream

We will be processing continuous messages like kafka or Keinesis
Streaming data will be convered to RDD, by defining microbatch interval.

Flink is true streaming solution.


DStream will consist of set of RDD. Spark performas operations on RDD.

messages at certain interval will be aggregated, to form RDD. Set of RDD will create Dstream.

Within Dstreams, spark does batch processing on individual RDDs.


For Dstream we need to have StreamingContext. StreamingContext is built using sparkContext and the timeinterval.


Socket = ip +port

=========================

Below command can be used to produce messges
nc -lk 9999



(base) basanmachine:SparkProjects basan$ nc -lk 9999
test
test2
basan
patil
dummy producer basan


(base) basanmachine:~ $ spark-shell --master local[2]


cala> (base) basanmachine:~ basan$ spark-shell --master local[2]
2019-12-07 15:26:24 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://localhost:4040
Spark context available as 'sc' (master = local[2], app id = local-1575712589217).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.3.2
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_191)
Type in expressions to have them evaluated.
Type :help for more information.

scala>


import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
val ssc= new StreamingContext(sc, Seconds(5))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word =>(word,1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()




scala> import org.apache.spark._
import org.apache.spark._

scala> import org.apache.spark.streaming._
import org.apache.spark.streaming._

scala> import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.StreamingContext._

scala> val ssc= new StreamingContext(sc, Seconds(5))
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@350342e0

scala> val lines = ssc.socketTextStream("localhost", 9999)
lines: org.apache.spark.streaming.dstream.ReceiverInputDStream[String] = org.apache.spark.streaming.dstream.SocketInputDStream@53079ae6

scala> val words = lines.flatMap(_.split(" "))
words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@423762ae

scala> val pairs = words.map(word =>(word,1))
pairs: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.MappedDStream@4e9695cf

scala> val wordCounts = pairs.reduceByKey(_ + _)
wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@2351255a

scala> wordCounts.print()

scala>



scala> ssc.start()

scala> 2019-12-07 15:37:12 WARN  RandomBlockReplicationPolicy:66 - Expecting 1 replicas with only 0 peer/s.
2019-12-07 15:37:12 WARN  BlockManager:66 - Block input-0-1575713232400 replicated to only 0 peer(s) instead of 1 peers
-------------------------------------------
Time: 1575713235000 ms
-------------------------------------------
(test2,1)
(test,1)
(patil,1)
(basan,1)

-------------------------------------------
Time: 1575713240000 ms
-------------------------------------------

-------------------------------------------
Time: 1575713245000 ms
-------------------------------------------

2019-12-07 15:37:25 WARN  RandomBlockReplicationPolicy:66 - Expecting 1 replicas with only 0 peer/s.
2019-12-07 15:37:25 WARN  BlockManager:66 - Block input-0-1575713245200 replicated to only 0 peer(s) instead of 1 peers
-------------------------------------------
Time: 1575713250000 ms
-------------------------------------------
(test2,1)
(3,1)
(tet,1)

[Stage 0:>                                                          (0 + 1) / 1]2019-12-07 15:37:34 WARN  RandomBlockReplicationPolicy:66 - Expecting 1 replicas with only 0 peer/s.
2019-12-07 15:37:34 WARN  BlockManager:66 - Block input-0-1575713253800 replicated to only 0 peer(s) instead of 1 peers
-------------------------------------------
Time: 1575713255000 ms
-------------------------------------------
(basana,1)

[Stage 0:>                                                          (0 + 1) / 1]2019-12-07 15:37:39 WARN  RandomBlockReplicationPolicy:66 - Expecting 1 replicas with only 0 peer/s.
2019-12-07 15:37:39 WARN  BlockManager:66 - Block input-0-1575713259600 replicated to only 0 peer(s) instead of 1 peers
-------------------------------------------
Time: 1575713260000 ms
-------------------------------------------
(patil,1)
(basan,1)

-------------------------------------------

==================================================================================================================
pushing messages from socket
(base) basanmachine:SparkProjects basan$ nc -lk 9999
basan patil
test test2
test2 tet 3
basana
patil basan


==================================================================================================================
Observe the results, it just takes the aggregation of 5 seconds and not worried of previous results. This is the behaviour
in stateless transformation.


stateless transformation : For each RDD operation was applied and ignores the prvious results.

==================================================================================================================
statefull transformation :
We can get details from the starting of stream, or we can get few RDDS by defining window.

updateStateByKey(updateFn) - is the stateful function.

Pair RDD : tuple of elements.

==================================================================================================================

Window Operations :
Worried of last certail time interval.

summary function - consdering new ocming RDD operation
Inverse function - subtracting outgoing RDD operation

checkpoint is needed to  track the changes.

countByTwo - is the stateful operation

line.countByTwo(10, 2)

window size is 10 and 2 is the sliding interval.
Every 2 sec it will remove one old rdd and adds new rdd


sliding interval : has to be multiple of the Streaming Context inteval as it cannot read half RDD.

==================================================================================================================

If the producer is much faster compared to consumer then the data loss can happem in socketstream. Thats where buffering
mechanism like kafka.
All streaming solutions uses kafka.

TwitterStream can be the example.

No comments:

Post a Comment