Powered By Blogger

Saturday, January 11, 2020

Spark Learnings

Learning of spark
1. drop column has to be assigned to variable
    val validRecordsDroppedStatusColumnsDF = validRecordsDF.drop("value_vald_status")
      .drop("cat_value_vald_status")
   
2. columnsAsPerTableValidPartitionRecordsDF.write.format("orc").mode("overwrite")
          //.partitionBy("date_val","hour_val")
          .insertInto("basan.employee")

3. Setting variables of spark is different from setting conf of spark properties
      hive.exec.max.dynamic.partitions=2000

4.Creating the session which avoids deleting any of the partitions and interacts with Hive
 def getSparkSession(appName:String): SparkSession = {
    val wareHouseLocation = "/apps/hive/warehouse"
    val sparkSession = SparkSession.getActiveSession.getOrElse(SparkSession.builder
      .appName(appName)
      .config("spark.sql.sources.partitionOverwriteMode" , "dynamic")
      .config("hive.exec.dynamic.partition.mode", "nonstrict" )
      .config("spark.sql.orc.impl","native")
      .config("spark.sql.orc.enableVectorizedReader","true")
      .config("spark.sql.warehouse.dir", wareHouseLocation)
      .enableHiveSupport()
      .getOrCreate())
    logInfo("loaded sparkSession")
    sparkSession
  }


  5. Writing to table, observe we are passing orc
        dataframe.write.format("orc"").mode(SaveMode.Overwrite).insertInto("basan.tableName1")

6. loading table info , always assign the name
    val dataFrame: DataFrame = sparkSession.sql("SELECT * from basan.tableName1")
7. selecting columns

val VALID_RECORDS_WITH_PARTITION_FIELDS = Seq(
    "kafka_key",
    "partition",
    "offset",
    "timestamp",
    "topic",
    "load_date")
val failuresColumns = VALID_RECORDS_WITH_PARTITION_FIELDS.map(col)
    val FailuresReadDF = dataframe.select(failuresColumns:_*)



Wednesday, January 8, 2020

Spark Session creation to read the ORC files in Spark 2.3.2

val sparkSession: SparkSession = SparkSession.builder.appName("session creation basan")
  .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
  .config("hive.exec.dynamic.partition.mode", "nonstrict")
  .config("spark.sql.orc.impl", "native")
  .config("spark.sql.orc.enableVectorizedReader", "true").master("local[*]").getOrCreate()

Saturday, January 4, 2020

Map Inversion

val m = Map("1" -> List("a","b","c")
           ,"2" -> List("a","j","k")
           ,"3" -> List("a","c"))

m flatten {case(k, vs) => vs.map((_, k))} groupBy (_._1) mapValues {_.map(_._2)}
//res0: Map[String,Iterable[String]] = Map(j -> List(2), a -> List(1, 2, 3), b -> List(1), c -> List(1, 3), k -> List(2))
Flatten the Map into a collection of tuples. groupBy will create a new Map with the old values as the new keys. Then un-tuple the values by removing the key (previously value) elements.



Map of type Map[A, Seq[B]] to Map[B, Seq[A]], where each B in the new map is associated with every A in the old map for which the B was contained in A’s associated sequence.