Powered By Blogger

Saturday, January 11, 2020

Spark Learnings

Learning of spark
1. drop column has to be assigned to variable
    val validRecordsDroppedStatusColumnsDF = validRecordsDF.drop("value_vald_status")
      .drop("cat_value_vald_status")
   
2. columnsAsPerTableValidPartitionRecordsDF.write.format("orc").mode("overwrite")
          //.partitionBy("date_val","hour_val")
          .insertInto("basan.employee")

3. Setting variables of spark is different from setting conf of spark properties
      hive.exec.max.dynamic.partitions=2000

4.Creating the session which avoids deleting any of the partitions and interacts with Hive
 def getSparkSession(appName:String): SparkSession = {
    val wareHouseLocation = "/apps/hive/warehouse"
    val sparkSession = SparkSession.getActiveSession.getOrElse(SparkSession.builder
      .appName(appName)
      .config("spark.sql.sources.partitionOverwriteMode" , "dynamic")
      .config("hive.exec.dynamic.partition.mode", "nonstrict" )
      .config("spark.sql.orc.impl","native")
      .config("spark.sql.orc.enableVectorizedReader","true")
      .config("spark.sql.warehouse.dir", wareHouseLocation)
      .enableHiveSupport()
      .getOrCreate())
    logInfo("loaded sparkSession")
    sparkSession
  }


  5. Writing to table, observe we are passing orc
        dataframe.write.format("orc"").mode(SaveMode.Overwrite).insertInto("basan.tableName1")

6. loading table info , always assign the name
    val dataFrame: DataFrame = sparkSession.sql("SELECT * from basan.tableName1")
7. selecting columns

val VALID_RECORDS_WITH_PARTITION_FIELDS = Seq(
    "kafka_key",
    "partition",
    "offset",
    "timestamp",
    "topic",
    "load_date")
val failuresColumns = VALID_RECORDS_WITH_PARTITION_FIELDS.map(col)
    val FailuresReadDF = dataframe.select(failuresColumns:_*)



No comments:

Post a Comment