Powered By Blogger

Sunday, September 29, 2019

Script to copy file in hdfs

nohup sh -x movefiles-15001-20000.sh > movefiles-15001-20000.txt &


movefiles-15001-20000.sh
Below script has code for reading the file line by line and copyinh



#!/bin/bash
cat /home_dir/basan/archive/15001-20000.txt | while read line
do
hdfs dfs -cp $line /common/basan/landing/Commitment_Loss/archive_15001_20000
if [ $? -ne 0 ]; then
                  echo -e "failure" >> /home_dir/basan/archive/failure15001-20000.txt
          else
                  echo -e "success $line" >> /home_dir/basan/archive/successfile15001-20000.txt
     fi
done


15001-20000.txt
/common/basan/landing/archive/demand/1-1-1565008054737.txt
/common/basan/landing/archive/demand/1-1-1565008054869.txt
/common/basan/landing/archive/demand/1-1-1565008054938.txt
/common/basan/landing/archive/demand/1-1-1565008055010.txt
/common/basan/landing/archive/demand/1-1-1565008055087.txt
/common/basan/landing/archive/demand/1-1-1565009497848.txt

Working with Scala mutable Map

al validatorMap: scala.collection.mutable.HashMap[Int, Array[String]] = scala.collection.mutable.HashMap()
    val validators = Array("min", "max", "othervalidator")
    validatorMap.put(0, validators)
    validatorMap.foreach(kv => {
          println("field: " + kv._1 + " num validators:" + kv._2.length )
    })


output
validatorMap: scala.collection.mutable.HashMap[Int,Array[String]] = Map()
validators: Array[String] = Array(min, max, othervalidator)
res0: Option[Array[String]] = None
field: 0 num validators:3

Monday, September 23, 2019

Row Encoder in Spark

/Users/basan/Documents/Spark/jsonfiles/input/sample.json
{"user_id":411,"datetime":"2017-09-01 12:00:54","os":"xp","browser":"Chrome","response_time_ms":1848,"url":"https://static.chasecdn.com/web/library/blue-boot/dist/blue-boot/2.11.1/js/main-ver.js"}
{"user_id":864,"datetime":"2017-09-01 12:01:05","os":"win7","browser":"IE","response_time_ms":4866,"url":"https://www.chase.com/"}

./spark-shell --packages org.json:json:20171018


val rdd = sc.textFile("/Users/basan/Documents/Spark/jsonfiles/input/sample.json")
rdd.take(1).foreach(println)
import java.sql.Timestamp

case class ClickStream(user_id: Long, datetime: Timestamp, os: String, browser: String, response_time_ms: Long, url: String)

import org.json.JSONObject
import java.text.SimpleDateFormat

val format = new SimpleDateFormat("yyyy-MM-dd' 'HH:mm:ss")

val parsedRDD = rdd.map(x =>{val obj = new JSONObject(x)
ClickStream(obj.getLong("user_id"), new Timestamp(format.parse(obj.getString("datetime")).getTime), obj.getString("os"), obj.getString("browser"), obj.getLong("response_time_ms"), obj.getString("url"))})

parsedRDD.take(1).foreach(println)
val df = parsedRDD.toDF
df.show

__________

./spark-shell --packages org.json:json:20171018


val groupByColumns = List(("os","string"),("browser","string"))
val colToAvg = ("response_time_ms", "integer")

val DF = spark.read.text("/Users/basan/Documents/Spark/jsonfiles/input/sample.json")
DF.take(1).foreach(println)

import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.encoders.RowEncoder
var schema = new StructType
for (i <- coltoavg="" groupbycolumns="" list="" p="">    schema = schema.add(i._1, i._2)
val encoder = RowEncoder.apply(schema)


val parsedDF = DF.map( x =>
{
val obj = new JSONObject(x.getString(0))

var buffer = new ListBuffer[Object]()
for (i <- coltoavg="" groupbycolumns="" list="" p="">buffer += obj.get(i._1)

org.apache.spark.sql.Row(buffer.toList:_*)
})(encoder)

parsedDF.show

val results = parsedDF.groupBy(groupByColumns.map(_._1).head, groupByColumns.map(_._1).tail: _*).avg(colToAvg._1)
results.show



Observe we are putting Encoder in Dataframe

Tuesday, September 3, 2019

spark kafka integration - instantiating code at worker side


Best tutorial for spark and kafka integration using scala


Design Patterns for using foreachRDD
dstream.foreachRDD is a powerful primitive that allows data to be sent out to external systems. However, it is important to understand how to use this primitive correctly and efficiently. Some of the common mistakes to avoid are as follows.
Often writing data to external system requires creating a connection object (e.g. TCP connection to a remote server) and using it to send data to a remote system. For this purpose, a developer may inadvertently try creating a connection object at the Spark driver, and then try to use it in a Spark worker to save records in the RDDs. For example (in Scala),
dstream.foreachRDD { rdd =>
  rdd
.foreachPartition { partitionOfRecords =>
   
// ConnectionPool is a static, lazily initialized pool of connections
   
val connection = ConnectionPool.getConnection()
    partitionOfRecords
.foreach(record => connection.send(record))
   
ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
 
}
}


Very important links

https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-dynamic-partition-inserts.html

http://allaboutscala.com/big-data/spark/

http://allaboutscala.com/