Powered By Blogger

Monday, September 23, 2019

Row Encoder in Spark

/Users/basan/Documents/Spark/jsonfiles/input/sample.json
{"user_id":411,"datetime":"2017-09-01 12:00:54","os":"xp","browser":"Chrome","response_time_ms":1848,"url":"https://static.chasecdn.com/web/library/blue-boot/dist/blue-boot/2.11.1/js/main-ver.js"}
{"user_id":864,"datetime":"2017-09-01 12:01:05","os":"win7","browser":"IE","response_time_ms":4866,"url":"https://www.chase.com/"}

./spark-shell --packages org.json:json:20171018


val rdd = sc.textFile("/Users/basan/Documents/Spark/jsonfiles/input/sample.json")
rdd.take(1).foreach(println)
import java.sql.Timestamp

case class ClickStream(user_id: Long, datetime: Timestamp, os: String, browser: String, response_time_ms: Long, url: String)

import org.json.JSONObject
import java.text.SimpleDateFormat

val format = new SimpleDateFormat("yyyy-MM-dd' 'HH:mm:ss")

val parsedRDD = rdd.map(x =>{val obj = new JSONObject(x)
ClickStream(obj.getLong("user_id"), new Timestamp(format.parse(obj.getString("datetime")).getTime), obj.getString("os"), obj.getString("browser"), obj.getLong("response_time_ms"), obj.getString("url"))})

parsedRDD.take(1).foreach(println)
val df = parsedRDD.toDF
df.show

__________

./spark-shell --packages org.json:json:20171018


val groupByColumns = List(("os","string"),("browser","string"))
val colToAvg = ("response_time_ms", "integer")

val DF = spark.read.text("/Users/basan/Documents/Spark/jsonfiles/input/sample.json")
DF.take(1).foreach(println)

import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.encoders.RowEncoder
var schema = new StructType
for (i <- coltoavg="" groupbycolumns="" list="" p="">    schema = schema.add(i._1, i._2)
val encoder = RowEncoder.apply(schema)


val parsedDF = DF.map( x =>
{
val obj = new JSONObject(x.getString(0))

var buffer = new ListBuffer[Object]()
for (i <- coltoavg="" groupbycolumns="" list="" p="">buffer += obj.get(i._1)

org.apache.spark.sql.Row(buffer.toList:_*)
})(encoder)

parsedDF.show

val results = parsedDF.groupBy(groupByColumns.map(_._1).head, groupByColumns.map(_._1).tail: _*).avg(colToAvg._1)
results.show



Observe we are putting Encoder in Dataframe

2 comments: