Powered By Blogger

Monday, February 24, 2020

Adding column in dataframe with user defined value if the column is null

Add another column by checking the previous column value


scala> val df1 = Seq(
     |            (
     |              "track1",
     |              "2019-11-18T16:11:28",
     |              "linekey1",
     |              "item1",
     |              "123"
     |            ),
     |            (
     |              "track2",
     |              "2019-11-18T16:11:28",
     |              "linekey2",
     |              "item2",
     |              null
     |            )).toDF(
     |            "number",
     |            "source_modified_timestamp",
     |            "line_key",
     |            "item_no",
     |            "tracking_number"
     |          )
df1: org.apache.spark.sql.DataFrame = [number: string, source_modified_timestamp: string ... 3 more fields]

scala> df1.show
+------+-------------------------+--------+-------+---------------+
|number|source_modified_timestamp|line_key|item_no|tracking_number|
+------+-------------------------+--------+-------+---------------+
|track1|      2019-11-18T16:11:28|linekey1|  item1|            123|
|track2|      2019-11-18T16:11:28|linekey2|  item2|           null|
+------+-------------------------+--------+-------+---------------+


scala>     df1.withColumn("tracking_number_rm", when($"tracking_number".isNull, lit("-999999")).otherwise(lit($"tracking_number"))).show
+------+-------------------------+--------+-------+---------------+------------------+
|number|source_modified_timestamp|line_key|item_no|tracking_number|tracking_number_rm|
+------+-------------------------+--------+-------+---------------+------------------+
|track1|      2019-11-18T16:11:28|linekey1|  item1|            123|               123|
|track2|      2019-11-18T16:11:28|linekey2|  item2|           null|           -999999|
+------+-------------------------+--------+-------+---------------+------------------+


scala>         df1.withColumn("tracking_number_rm", when($"tracking_number".isNull, "-1111").otherwise($"tracking_number")).show
+------+-------------------------+--------+-------+---------------+------------------+
|number|source_modified_timestamp|line_key|item_no|tracking_number|tracking_number_rm|
+------+-------------------------+--------+-------+---------------+------------------+
|track1|      2019-11-18T16:11:28|linekey1|  item1|            123|               123|
|track2|      2019-11-18T16:11:28|linekey2|  item2|           null|             -1111|
+------+-------------------------+--------+-------+---------------+------------------+

Saturday, February 22, 2020

Group by considers null as seperate row


val simpleData = Seq(("James","Sales","NY",90000,34,10000),
    ("Michael","Sales","NY",86000,56,20000),
    ("Robert","Sales","CA",81000,30,23000),
    ("Maria","Finance","CA",90000,24,23000),
    ("Raman","Finance","CA",99000,40,24000),
    ("Scott","Finance","NY",83000,36,19000),
    ("Jen","Finance","NY",79000,53,15000),
    ("Jeff","Marketing","CA",80000,25,18000),
    ("Kumar","Marketing","NY",91000,50,21000)
  )
  val df = simpleData.toDF("employee_name","department","state","salary","age","bonus")
  df.show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
+-------------+----------+-----+------+---+-----+



    //using agg function
  df.groupBy("department")
    .agg(
      sum("salary").as("sum_salary"),
      avg("salary").as("avg_salary"),
      sum("bonus").as("sum_bonus"),
      max("bonus").as("max_bonus"))
    .show(false)

+----------+----------+-----------------+---------+---------+
|department|sum_salary|avg_salary       |sum_bonus|max_bonus|
+----------+----------+-----------------+---------+---------+
|Sales     |257000    |85666.66666666667|53000    |23000    |
|Finance   |351000    |87750.0          |81000    |24000    |
|Marketing |171000    |85500.0          |39000    |21000    |
+----------+----------+-----------------+---------+---------+


=====

val simpleData = Seq(("James","Sales","NY",90000,34,10000),
    ("Michael","Sales","NY",86000,56,20000),
    ("Robert","Sales","CA",81000,30,23000),
    ("Maria","Finance","CA",90000,24,23000),
    ("Raman","Finance","CA",99000,40,24000),
    ("Scott","Finance","NY",83000,36,19000),
    ("Jen","Finance","NY",79000,53,15000),
    ("Jeff","Marketing","CA",80000,25,18000),
    ("Kumar",null,"NY",91000,50,21000)
  )
  val df = simpleData.toDF("employee_name","department","state","salary","age","bonus")
  df.show()


  +-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Kumar|      null|   NY| 91000| 50|21000|
+-------------+----------+-----+------+---+-----+

Observe null is treated as seperate row

+----------+----------+-----------------+---------+---------+
|department|sum_salary|avg_salary       |sum_bonus|max_bonus|
+----------+----------+-----------------+---------+---------+
|Sales     |257000    |85666.66666666667|53000    |23000    |
|null      |91000     |91000.0          |21000    |21000    |
|Finance   |351000    |87750.0          |81000    |24000    |
|Marketing |80000     |80000.0          |18000    |18000    |
+----------+----------+-----------------+---------+---------+

Thursday, February 20, 2020

Loading Hive config in spark shell

-bash-4.2$ spark-shell --conf spark.sql.sources.partitionOverwriteMode=dynamic \
>        --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict \
>        --conf spark.sql.orc.impl=native \
>        --conf spark.sql.orc.enableVectorizedReader=true \
>        --conf spark.hadoop.hive.exec.max.dynamic.partitions=2550



Observe spark.hadoop.* is placed for all the hadoop configurations


If we pass  only --conf hive.exec.max.dynamic.partitions=2550 then this property will be ignored by spark-shell in 2.3.1 version


We can also pass memory config as below
spark-shell --driver-memory 10G --executor-memory 15G --executor-cores 8

Sunday, February 9, 2020

Loading local jar in spark-shell

Loading jar from local
-bash-4.2$ spark-shell --jars /home_dir/basan/dpp/pipelinecore_2.11-1.2.1.jar


-bash-4.2$ spark-shell --jars


Loading jar from internet for spark shell


-bash-4.2$ spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2 

Loading jar from internet for spark submit


spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2 --queue basanqueue --driver-memory 2G --num-executors 8 --executor-memory 14G --executor-cores 6 --class com.basan.CoreTableProcessor  dpp-assembly-1.0.jar application.conf 1 2