Powered By Blogger

Saturday, October 5, 2019

creating dataframe in spark shell

acbc32a57245:bin z002qhl$ pwd
/Users/basan/Documents/Spark/spark-2.4.1-bin-hadoop2.7/bin
acbc32a57245:bin z002qhl$ ./spark-s
spark-shell   spark-sql     spark-submit

acbc32a57245:bin basan$ ./spark-shell
19/10/06 08:43:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://192.168.1.4:4040
Spark context available as 'sc' (master = local[*], app id = local-1570331637922).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.1
      /_/

Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_191)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val data = List(("James ","","Smith","36636","M",60000),
     |         ("Michael ","Rose","","40288","M",70000),
     |         ("Robert ","","Williams","42114","",400000),
     |         ("Maria ","Anne","Jones","39192","F",500000),
     |         ("Jen","Mary","Brown","","F",0))
data: List[(String, String, String, String, String, Int)] = List(("James ","",Smith,36636,M,60000), ("Michael ",Rose,"",40288,M,70000), ("Robert ","",Williams,42114,"",400000), ("Maria ",Anne,Jones,39192,F,500000), (Jen,Mary,Brown,"",F,0))

scala> val cols = Seq("first_name","middle_name","last_name","dob","gender","salary")
cols: Seq[String] = List(first_name, middle_name, last_name, dob, gender, salary)

scala> val df = spark.createDataFrame(data).toDF(cols:_*)
df: org.apache.spark.sql.DataFrame = [first_name: string, middle_name: string ... 4 more fields]

scala> df.show(2)
+----------+-----------+---------+-----+------+------+
|first_name|middle_name|last_name|  dob|gender|salary|
+----------+-----------+---------+-----+------+------+
|    James |           |    Smith|36636|     M| 60000|
|  Michael |       Rose|         |40288|     M| 70000|
+----------+-----------+---------+-----+------+------+
only showing top 2 rows


scala> val df4 = df.select(col("*"),
     |       expr("case when gender = 'M' then 'Male' " +
     |                        "when gender = 'F' then 'Female' " +
     |                        "else 'Unknown' end").alias("new_gender"))
df4: org.apache.spark.sql.DataFrame = [first_name: string, middle_name: string ... 5 more fields]

scala> df4.show(2)
+----------+-----------+---------+-----+------+------+----------+
|first_name|middle_name|last_name|  dob|gender|salary|new_gender|
+----------+-----------+---------+-----+------+------+----------+
|    James |           |    Smith|36636|     M| 60000|      Male|
|  Michael |       Rose|         |40288|     M| 70000|      Male|
+----------+-----------+---------+-----+------+------+----------+
only showing top 2 rows


scala> val df3 = df.withColumn("new_gender2",
     |       expr("case when gender = 'M' then 'Male' " +
     |                        "when gender = 'F' then 'Female' " +
     |                        "else 'Unknown' end"))
df3: org.apache.spark.sql.DataFrame = [first_name: string, middle_name: string ... 5 more fields]

scala> df3.show(2)
+----------+-----------+---------+-----+------+------+-----------+
|first_name|middle_name|last_name|  dob|gender|salary|new_gender2|
+----------+-----------+---------+-----+------+------+-----------+
|    James |           |    Smith|36636|     M| 60000|       Male|
|  Michael |       Rose|         |40288|     M| 70000|       Male|
+----------+-----------+---------+-----+------+------+-----------+
only showing top 2 rows


scala>

scala> val df3 = df.withColumn("geneder",
     |       expr("case when gender = 'M' then 'Male' " +
     |                        "when gender = 'F' then 'Female' " +
     |                        "else 'Unknown' end"))
df3: org.apache.spark.sql.DataFrame = [first_name: string, middle_name: string ... 5 more fields]

scala> df3.show(2)
+----------+-----------+---------+-----+------+------+-------+
|first_name|middle_name|last_name|  dob|gender|salary|geneder|
+----------+-----------+---------+-----+------+------+-------+
|    James |           |    Smith|36636|     M| 60000|   Male|
|  Michael |       Rose|         |40288|     M| 70000|   Male|
+----------+-----------+---------+-----+------+------+-------+
only showing top 2 rows


scala>

scala> val df3 = df.withColumn("gender",
     |       expr("case when gender = 'M' then 'Male' " +
     |                        "when gender = 'F' then 'Female' " +
     |                        "else 'Unknown' end"))
df3: org.apache.spark.sql.DataFrame = [first_name: string, middle_name: string ... 4 more fields]

scala> df3.show(2)
+----------+-----------+---------+-----+------+------+
|first_name|middle_name|last_name|  dob|gender|salary|
+----------+-----------+---------+-----+------+------+
|    James |           |    Smith|36636|  Male| 60000|
|  Michael |       Rose|         |40288|  Male| 70000|
+----------+-----------+---------+-----+------+------+
only showing top 2 rows


1 comment:

  1. https://sparkbyexamples.com/spark/spark-case-when-otherwise-example/

    ReplyDelete