/Users/basan/Documents/Spark/jsonfiles/input/sample.json
{"user_id":411,"datetime":"2017-09-01 12:00:54","os":"xp","browser":"Chrome","response_time_ms":1848,"url":"https://static.chasecdn.com/web/library/blue-boot/dist/blue-boot/2.11.1/js/main-ver.js"}
{"user_id":864,"datetime":"2017-09-01 12:01:05","os":"win7","browser":"IE","response_time_ms":4866,"url":"https://www.chase.com/"}
./spark-shell --packages org.json:json:20171018
val rdd = sc.textFile("/Users/basan/Documents/Spark/jsonfiles/input/sample.json")
rdd.take(1).foreach(println)
import java.sql.Timestamp
case class ClickStream(user_id: Long, datetime: Timestamp, os: String, browser: String, response_time_ms: Long, url: String)
import org.json.JSONObject
import java.text.SimpleDateFormat
val format = new SimpleDateFormat("yyyy-MM-dd' 'HH:mm:ss")
val parsedRDD = rdd.map(x =>{val obj = new JSONObject(x)
ClickStream(obj.getLong("user_id"), new Timestamp(format.parse(obj.getString("datetime")).getTime), obj.getString("os"), obj.getString("browser"), obj.getLong("response_time_ms"), obj.getString("url"))})
parsedRDD.take(1).foreach(println)
val df = parsedRDD.toDF
df.show
__________
./spark-shell --packages org.json:json:20171018
val groupByColumns = List(("os","string"),("browser","string"))
val colToAvg = ("response_time_ms", "integer")
val DF = spark.read.text("/Users/basan/Documents/Spark/jsonfiles/input/sample.json")
DF.take(1).foreach(println)
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.encoders.RowEncoder
var schema = new StructType
for (i <- coltoavg="" groupbycolumns="" list="" p=""> schema = schema.add(i._1, i._2)
val encoder = RowEncoder.apply(schema)
val parsedDF = DF.map( x =>
{
val obj = new JSONObject(x.getString(0))
var buffer = new ListBuffer[Object]()
for (i <- coltoavg="" groupbycolumns="" list="" p="">buffer += obj.get(i._1)
org.apache.spark.sql.Row(buffer.toList:_*)
})(encoder)
parsedDF.show
val results = parsedDF.groupBy(groupByColumns.map(_._1).head, groupByColumns.map(_._1).tail: _*).avg(colToAvg._1)
results.show
Observe we are putting Encoder in Dataframe
->->
{"user_id":411,"datetime":"2017-09-01 12:00:54","os":"xp","browser":"Chrome","response_time_ms":1848,"url":"https://static.chasecdn.com/web/library/blue-boot/dist/blue-boot/2.11.1/js/main-ver.js"}
{"user_id":864,"datetime":"2017-09-01 12:01:05","os":"win7","browser":"IE","response_time_ms":4866,"url":"https://www.chase.com/"}
./spark-shell --packages org.json:json:20171018
val rdd = sc.textFile("/Users/basan/Documents/Spark/jsonfiles/input/sample.json")
rdd.take(1).foreach(println)
import java.sql.Timestamp
case class ClickStream(user_id: Long, datetime: Timestamp, os: String, browser: String, response_time_ms: Long, url: String)
import org.json.JSONObject
import java.text.SimpleDateFormat
val format = new SimpleDateFormat("yyyy-MM-dd' 'HH:mm:ss")
val parsedRDD = rdd.map(x =>{val obj = new JSONObject(x)
ClickStream(obj.getLong("user_id"), new Timestamp(format.parse(obj.getString("datetime")).getTime), obj.getString("os"), obj.getString("browser"), obj.getLong("response_time_ms"), obj.getString("url"))})
parsedRDD.take(1).foreach(println)
val df = parsedRDD.toDF
df.show
__________
./spark-shell --packages org.json:json:20171018
val groupByColumns = List(("os","string"),("browser","string"))
val colToAvg = ("response_time_ms", "integer")
val DF = spark.read.text("/Users/basan/Documents/Spark/jsonfiles/input/sample.json")
DF.take(1).foreach(println)
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.encoders.RowEncoder
var schema = new StructType
for (i <- coltoavg="" groupbycolumns="" list="" p=""> schema = schema.add(i._1, i._2)
val encoder = RowEncoder.apply(schema)
val parsedDF = DF.map( x =>
{
val obj = new JSONObject(x.getString(0))
var buffer = new ListBuffer[Object]()
for (i <- coltoavg="" groupbycolumns="" list="" p="">buffer += obj.get(i._1)
org.apache.spark.sql.Row(buffer.toList:_*)
})(encoder)
parsedDF.show
val results = parsedDF.groupBy(groupByColumns.map(_._1).head, groupByColumns.map(_._1).tail: _*).avg(colToAvg._1)
results.show
Observe we are putting Encoder in Dataframe
kayseriescortu.com - alacam.org - xescortun.com
ReplyDeleteMmorpg
ReplyDeleteinstagram takipçi satın al
Tiktok jeton hilesi
tiktok jeton hilesi
antalya saç ekimi
referans kimliği nedir
İnstagram takipçi satın al
METİN2 PVP SERVERLER
instagram takipçi satın al