Powered By Blogger

Saturday, November 30, 2019

Dataframe simple operations


package com.basan.day5.df

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.log4j._
import org.apache.spark.sql.Column

object LoggingWithCaseClass {

  case class Logging(level: String, datetime: String)

  def mapper(line: String): Logging = {
    val fields = line.split(",")
    val logging: Logging = Logging(fields(0), fields(1))
    logging
  }

  /** Our main function where the action happens */
  def main(args: Array[String]) {

    // Set the log level to only print errors
    Logger.getLogger("org").setLevel(Level.ERROR)

    // Use new SparkSession interface in Spark 2.0
    // Use new SparkSession interface in Spark 2.0
    val spark = SparkSession
      .builder
      .appName("SparkSQL")
      .master("local[*]")
      .config("spark.sql.warehouse.dir", "file:///C:/temp") // Necessary to work around a Windows bug in Spark 2.0.0; omit if you're not on Windows.
      .getOrCreate()

    import spark.implicits._

    var mylist = List(
      "WARN,2016-12-31 04:19:32",
      "FATAL,2016-12-31 03:19:32",
      "WARN,2016-12-31 04:19:32",
      "WARN,2016-12-31 04:19:31",
      "INFO,2016-12-31 04:19:32",
      "FATAL,2016-12-31 14:19:32")

    val rdd1 = spark.sparkContext.parallelize(mylist)
    val rdd2 = rdd1.map(mapper)

    val df1 = rdd2.toDF()
    df1.show()
    df1.createOrReplaceTempView("logging_table")

    spark.sql("Select * from logging_table ").show()
    //show(false) will display all the results
    spark.sql("Select level , collect_list(datetime) from logging_table group by level order by level").show(false)

    spark.sql("Select level , count(datetime) from logging_table group by level order by level").show(false)
    
    
    spark.sql("select level, datetime from logging_table").show() 
    
    
    

  }

}






Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
+-----+-------------------+
|level|           datetime|
+-----+-------------------+
| WARN|2016-12-31 04:19:32|
|FATAL|2016-12-31 03:19:32|
| WARN|2016-12-31 04:19:32|
| WARN|2016-12-31 04:19:31|
| INFO|2016-12-31 04:19:32|
|FATAL|2016-12-31 14:19:32|
+-----+-------------------+

+-----+-------------------+
|level|           datetime|
+-----+-------------------+
| WARN|2016-12-31 04:19:32|
|FATAL|2016-12-31 03:19:32|
| WARN|2016-12-31 04:19:32|
| WARN|2016-12-31 04:19:31|
| INFO|2016-12-31 04:19:32|
|FATAL|2016-12-31 14:19:32|
+-----+-------------------+

+-----+---------------------------------------------------------------+
|level|collect_list(datetime)                                         |
+-----+---------------------------------------------------------------+
|FATAL|[2016-12-31 03:19:32, 2016-12-31 14:19:32]                     |
|INFO |[2016-12-31 04:19:32]                                          |
|WARN |[2016-12-31 04:19:32, 2016-12-31 04:19:32, 2016-12-31 04:19:31]|
+-----+---------------------------------------------------------------+

+-----+---------------+
|level|count(datetime)|
+-----+---------------+
|FATAL|2              |
|INFO |1              |
|WARN |3              |
+-----+---------------+

+-----+-------------------+
|level|           datetime|
+-----+-------------------+
| WARN|2016-12-31 04:19:32|
|FATAL|2016-12-31 03:19:32|
| WARN|2016-12-31 04:19:32|
| WARN|2016-12-31 04:19:31|
| INFO|2016-12-31 04:19:32|
|FATAL|2016-12-31 14:19:32|
+-----+-------------------+


No comments:

Post a Comment