package com.basan.day5.df
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.log4j._
import org.apache.spark.sql.Column
object LoggingWithCaseClass {
case class Logging(level: String, datetime: String)
def mapper(line: String): Logging = {
val fields = line.split(",")
val logging: Logging = Logging(fields(0), fields(1))
logging
}
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Use new SparkSession interface in Spark 2.0
// Use new SparkSession interface in Spark 2.0
val spark = SparkSession
.builder
.appName("SparkSQL")
.master("local[*]")
.config("spark.sql.warehouse.dir", "file:///C:/temp") // Necessary to work around a Windows bug in Spark 2.0.0; omit if you're not on Windows.
.getOrCreate()
import spark.implicits._
var mylist = List(
"WARN,2016-12-31 04:19:32",
"FATAL,2016-12-31 03:19:32",
"WARN,2016-12-31 04:19:32",
"WARN,2016-12-31 04:19:31",
"INFO,2016-12-31 04:19:32",
"FATAL,2016-12-31 14:19:32")
val rdd1 = spark.sparkContext.parallelize(mylist)
val rdd2 = rdd1.map(mapper)
val df1 = rdd2.toDF()
df1.show()
df1.createOrReplaceTempView("logging_table")
spark.sql("Select * from logging_table ").show()
//show(false) will display all the results
spark.sql("Select level , collect_list(datetime) from logging_table group by level order by level").show(false)
spark.sql("Select level , count(datetime) from logging_table group by level order by level").show(false)
spark.sql("select level, datetime from logging_table").show()
}
}
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
+-----+-------------------+
|level| datetime|
+-----+-------------------+
| WARN|2016-12-31 04:19:32|
|FATAL|2016-12-31 03:19:32|
| WARN|2016-12-31 04:19:32|
| WARN|2016-12-31 04:19:31|
| INFO|2016-12-31 04:19:32|
|FATAL|2016-12-31 14:19:32|
+-----+-------------------+
+-----+-------------------+
|level| datetime|
+-----+-------------------+
| WARN|2016-12-31 04:19:32|
|FATAL|2016-12-31 03:19:32|
| WARN|2016-12-31 04:19:32|
| WARN|2016-12-31 04:19:31|
| INFO|2016-12-31 04:19:32|
|FATAL|2016-12-31 14:19:32|
+-----+-------------------+
+-----+---------------------------------------------------------------+
|level|collect_list(datetime) |
+-----+---------------------------------------------------------------+
|FATAL|[2016-12-31 03:19:32, 2016-12-31 14:19:32] |
|INFO |[2016-12-31 04:19:32] |
|WARN |[2016-12-31 04:19:32, 2016-12-31 04:19:32, 2016-12-31 04:19:31]|
+-----+---------------------------------------------------------------+
+-----+---------------+
|level|count(datetime)|
+-----+---------------+
|FATAL|2 |
|INFO |1 |
|WARN |3 |
+-----+---------------+
+-----+-------------------+
|level| datetime|
+-----+-------------------+
| WARN|2016-12-31 04:19:32|
|FATAL|2016-12-31 03:19:32|
| WARN|2016-12-31 04:19:32|
| WARN|2016-12-31 04:19:31|
| INFO|2016-12-31 04:19:32|
|FATAL|2016-12-31 14:19:32|
+-----+-------------------+
No comments:
Post a Comment