package com.basan.day4.df
import org.apache.log4j._
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.SparkSession
/**
* 0 Will 33 385
* 1 Jean-Luc 26 2
* 2 Hugh 55 221
* 3 Deanna 40 465
* 4 Quark 68 21
* 5 Weyoun 59 318
* 6 Gowron 37 220
* 7 Will 54 307*
*/
object DataFrameExampleSQL {
case class Person(ID: Int, name: String, age: Int, numFriends: Int)
def mapper(line: String): Person = {
val fields = line.split(",")
val person: Person = Person(fields(0).toInt, fields(1), fields(2).toInt, fields(3).toInt)
person
}
def main(args: Array[String]) {
Logger.getLogger("org").setLevel(Level.ERROR)
val spark = SparkSession.builder
.appName("Data frame test using SQL")
.master("local[*]")
//needed only in windows check it
// .config("spark.sql.warehouse.dir", "/Users/basan/Documents/Spark/tempwarehouse")
// .config("spark.driver.allowMultipleContexts", "true")
.getOrCreate()
// val sc = new SparkContext("local[*]", "ERROR count with bigFile")
var lines = spark.sparkContext.textFile("/Users/basan/workspace-spark/sparkDemo1/spark-data/friends-data.csv")
import spark.implicits._
val schemaPeople = lines.map(mapper).toDS()
schemaPeople.createOrReplaceTempView("people")
//SQL on the dataframe
val teenagers = spark.sql("SELECT * from people where age>=13 AND age<=19")
print("show-----")
teenagers.show(2)
print("take-----")
teenagers.take(2)
val results = teenagers.collect()
results.foreach(println)
scala.io.StdIn.readLine()
spark.stop()
}
//look into the spark UI and SQL tab observe WholeStageCodegen block is generated
//Check the plan to see how queries gets executed and tuning can be done using [plans]
}
No comments:
Post a Comment