Powered By Blogger

Sunday, April 19, 2020

Fill Substitute null with values in spark Dataframe


import spark.implicits._
val sourceDF = Seq(
           ("2019-12-09", "11", 1, "L1", "I11", "2018"),
           ("2019-12-09", "11", 2, "L2", "I10", "2018"),
           ("2019-12-09", "11", 3, "L3", "I4", "2018"),
           ("2019-12-09", "11", 4, "L4", "I4", "2015"),
           ("2019-12-09", "11", 5, "L5", "I4", "2019"),
           ("2019-12-09", "11", 6, "L6", null, "2014"),
           ("2019-12-09",  null, 7, "L7", null, "2013")

         ).toDF("date", "hour", "order", "line", "item", "time")

sourceDF.show(false)
val filledDF = sourceDF.na.fill("-99999",Seq("hour","item"))
filledDF.show(false)



scala> sourceDF.show(false)
+----------+----+-----+----+----+----+
|date      |hour|order|line|item|time|
+----------+----+-----+----+----+----+
|2019-12-09|11  |1    |L1  |I11 |2018|
|2019-12-09|11  |2    |L2  |I10 |2018|
|2019-12-09|11  |3    |L3  |I4  |2018|
|2019-12-09|11  |4    |L4  |I4  |2015|
|2019-12-09|11  |5    |L5  |I4  |2019|
|2019-12-09|11  |6    |L6  |null|2014|
|2019-12-09|null|7    |L7  |null|2013|
+----------+----+-----+----+----+----+


scala> val filledDF = sourceDF.na.fill("-99999",Seq("hour","item"))
filledDF: org.apache.spark.sql.DataFrame = [date: string, hour: string ... 4 more fields]


scala> filledDF.show(false)
+----------+------+-----+----+------+----+
|date      |hour  |order|line|item  |time|
+----------+------+-----+----+------+----+
|2019-12-09|11    |1    |L1  |I11   |2018|
|2019-12-09|11    |2    |L2  |I10   |2018|
|2019-12-09|11    |3    |L3  |I4    |2018|
|2019-12-09|11    |4    |L4  |I4    |2015|
|2019-12-09|11    |5    |L5  |I4    |2019|
|2019-12-09|11    |6    |L6  |-99999|2014|
|2019-12-09|-99999|7    |L7  |-99999|2013|
+----------+------+-----+----+------+----+


scala>

Friday, April 17, 2020

External table access via spark partition drop

For the external table in hive if you are adding or dropping partitions and if you try to access the table it will give vertex failure issue.

MSCK will not fix it
MSCK REPAIR TABLE basan.shipment_ext;

To fix this drop the partitions.

show partitions basan.shipment_ext;
//dropping single partition
ALTER TABLE basan.shipment_ext DROP IF EXISTS PARTITION(order_date ='2019-07-15')
//dropping multiple partition
ALTER TABLE basan.shipment_ext DROP IF EXISTS PARTITION(order_date >'2018-07-15')

Saturday, April 11, 2020

Spark with mysql database(spark jdbc)

/install the mysql server 8.0.19 and create the schema basan and the table shipment

CREATE TABLE `shipment` (
  `order_number` text,
  `order_date` text,
  `ship_status` text
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;


  1.make sure jar version is mysql-connector-java-8.0.19.jar and spark version in 2.3.2
  2. copy mysql-connector-java-8.0.19.jar jar to the location /jars
  datanucleus-core-3.2.10.jar
  3. run spark-shell and runt he below commands to interact with jdbc
 
 val jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:mysql://localhost:3306/basan?useSSL=false")
  .option("driver", "com.mysql.jdbc.Driver")
  .option("dbtable", "shipment")
  .option("user", "root")
  .option("password", "root1234")
  .load()

import spark.implicits._

val someDF = Seq(
  ("ordernumberinsert1", "2020-02-05", "delivered"),
  ("ordernumberinsert2", "2020-02-05", "delivered"),
  ("ordernumberinsert3", "2020-02-05", "delivered")
).toDF("order_number", "order_date","ship_status")
someDF.write.mode("overwrite")
  .format("jdbc")
  .option("url", "jdbc:mysql://localhost:3306/basan?useSSL=false")
  .option("driver", "com.mysql.jdbc.Driver")
  .option("dbtable", "shipment")
  .option("user", "root")
  .option("password", "root1234")
  .save()