Learning of spark
1. drop column has to be assigned to variable
val validRecordsDroppedStatusColumnsDF = validRecordsDF.drop("value_vald_status")
.drop("cat_value_vald_status")
2. columnsAsPerTableValidPartitionRecordsDF.write.format("orc").mode("overwrite")
//.partitionBy("date_val","hour_val")
.insertInto("basan.employee")
3. Setting variables of spark is different from setting conf of spark properties
hive.exec.max.dynamic.partitions=2000
4.Creating the session which avoids deleting any of the partitions and interacts with Hive
def getSparkSession(appName:String): SparkSession = {
val wareHouseLocation = "/apps/hive/warehouse"
val sparkSession = SparkSession.getActiveSession.getOrElse(SparkSession.builder
.appName(appName)
.config("spark.sql.sources.partitionOverwriteMode" , "dynamic")
.config("hive.exec.dynamic.partition.mode", "nonstrict" )
.config("spark.sql.orc.impl","native")
.config("spark.sql.orc.enableVectorizedReader","true")
.config("spark.sql.warehouse.dir", wareHouseLocation)
.enableHiveSupport()
.getOrCreate())
logInfo("loaded sparkSession")
sparkSession
}
5. Writing to table, observe we are passing orc
dataframe.write.format("orc"").mode(SaveMode.Overwrite).insertInto("basan.tableName1")
6. loading table info , always assign the name
val dataFrame: DataFrame = sparkSession.sql("SELECT * from basan.tableName1")
7. selecting columns
val VALID_RECORDS_WITH_PARTITION_FIELDS = Seq(
"kafka_key",
"partition",
"offset",
"timestamp",
"topic",
"load_date")
val failuresColumns = VALID_RECORDS_WITH_PARTITION_FIELDS.map(col)
val FailuresReadDF = dataframe.select(failuresColumns:_*)
1. drop column has to be assigned to variable
val validRecordsDroppedStatusColumnsDF = validRecordsDF.drop("value_vald_status")
.drop("cat_value_vald_status")
2. columnsAsPerTableValidPartitionRecordsDF.write.format("orc").mode("overwrite")
//.partitionBy("date_val","hour_val")
.insertInto("basan.employee")
3. Setting variables of spark is different from setting conf of spark properties
hive.exec.max.dynamic.partitions=2000
4.Creating the session which avoids deleting any of the partitions and interacts with Hive
def getSparkSession(appName:String): SparkSession = {
val wareHouseLocation = "/apps/hive/warehouse"
val sparkSession = SparkSession.getActiveSession.getOrElse(SparkSession.builder
.appName(appName)
.config("spark.sql.sources.partitionOverwriteMode" , "dynamic")
.config("hive.exec.dynamic.partition.mode", "nonstrict" )
.config("spark.sql.orc.impl","native")
.config("spark.sql.orc.enableVectorizedReader","true")
.config("spark.sql.warehouse.dir", wareHouseLocation)
.enableHiveSupport()
.getOrCreate())
logInfo("loaded sparkSession")
sparkSession
}
5. Writing to table, observe we are passing orc
dataframe.write.format("orc"").mode(SaveMode.Overwrite).insertInto("basan.tableName1")
6. loading table info , always assign the name
val dataFrame: DataFrame = sparkSession.sql("SELECT * from basan.tableName1")
7. selecting columns
val VALID_RECORDS_WITH_PARTITION_FIELDS = Seq(
"kafka_key",
"partition",
"offset",
"timestamp",
"topic",
"load_date")
val failuresColumns = VALID_RECORDS_WITH_PARTITION_FIELDS.map(col)
val FailuresReadDF = dataframe.select(failuresColumns:_*)
No comments:
Post a Comment