Powered By Blogger

Tuesday, December 13, 2022

Array of structs

 scala> import spark.implicits._

import spark.implicits._


scala> import org.apache.spark.sql.types._

import org.apache.spark.sql.types._


scala> import org.apache.spark.sql._

import org.apache.spark.sql._


scala>  val arrayStructData = Seq(

     |       Row("James",List(Row("Java","XX",120),Row("Scala","XA",300))),

     |       Row("Michael",List(Row("Java","XY",200),Row("Scala","XB",500))),

     |       Row("Robert",List(Row("Java","XZ",400),Row("Scala","XC",250))),

     |       Row("Washington",null)

     |     )

arrayStructData: Seq[org.apache.spark.sql.Row] = List([James,List([Java,XX,120], [Scala,XA,300])], [Michael,List([Java,XY,200], [Scala,XB,500])], [Robert,List([Java,XZ,400], [Scala,XC,250])], [Washington,null])



    val arrayStructSchema = new StructType().add("name",StringType)

      .add("booksIntersted",ArrayType(new StructType()

        .add("name",StringType)

        .add("author",StringType)

        .add("pages",IntegerType)))



arrayStructSchema: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(booksIntersted,ArrayType(StructType(StructField(name,StringType,true), StructField(author,StringType,true), StructField(pages,IntegerType,true)),true),true))


scala>     val df = spark.createDataFrame(spark.sparkContext

     |         .parallelize(arrayStructData),arrayStructSchema)

df: org.apache.spark.sql.DataFrame = [name: string, booksIntersted: array<struct<name:string,author:string,pages:int>>]


scala>     df.printSchema()

root

 |-- name: string (nullable = true)

 |-- booksIntersted: array (nullable = true)

 |    |-- element: struct (containsNull = true)

 |    |    |-- name: string (nullable = true)

 |    |    |-- author: string (nullable = true)

 |    |    |-- pages: integer (nullable = true)



scala>     df.show(false)

+----------+-----------------------------------+

|name      |booksIntersted                     |

+----------+-----------------------------------+

|James     |[[Java, XX, 120], [Scala, XA, 300]]|

|Michael   |[[Java, XY, 200], [Scala, XB, 500]]|

|Robert    |[[Java, XZ, 400], [Scala, XC, 250]]|

|Washington|null                               |

+----------+-----------------------------------+


Thursday, December 8, 2022

UnionByName with the empty dataframe

 import spark.implicits._

import org.apache.spark.sql.types.
import org.apache.spark.sql._

val data = Seq(("James","Sales",34), ("Michael","Sales",56),
("Robert","Sales",30), ("Maria","Finance",24) )
val df1 = data.toDF("name","dept","age")
df1.printSchema()



val schema = StructType(
StructField("name", StringType, true) ::
StructField("dept", StringType, false) ::
StructField("age", IntegerType, false) :: Nil)


val df = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema)


val merged_df = df1.unionByName(df)
merged_df.show(false)

val merged_df2 = df.unionByName(df1)
merged_df2.show(false)

Tuesday, November 29, 2022

Writing empty dataframe spark

 import spark.implicits._


val data=Seq()

val columns = Seq("firstname","lastname","country","state")

import spark.implicits._

val df = data.toDF(columns:_*)



println("emptydataframe write succeed")

val df = spark.emptyDataFrame

val path = "/Users/basan/Documents/sparktest/emptyFolder2"

df.write.format("orc").save(path)


Thursday, November 10, 2022

CDAP connect to external DB

 Add below properties in cdap-site.xml


<property>

    <name>data.storage.implementation</name>

    <value>postgresql</value>

    <description>

      PG as metadata store

    </description>

  </property>


  <property>

    <name>data.storage.sql.jdbc.connection.url</name>

    <value>jdbc:postgresql://localhost:5432/basanversion3</value>

    <description>

      PG JDBC details

    </description>

  </property>


  <property>

    <name>data.storage.sql.jdbc.driver.name</name>

    <value>org.postgresql.Driver</value>

    <description>

      PG jdbc driver

    </description>

  </property>


  <property>

    <name>data.storage.sql.jdbc.driver.external</name>

    <value>true</value>

    <description>

      Indicates whether the JDBC driver has to be loaded from an external directory.

      If true, then the JDBC driver directory has to be specified using

      "data.storage.sql.jdbc.driver.directory".

      If false, then the JDBC driver is present in the CDAP classpath.

      This config can only be used when the storage implementation is postgresql.

    </description>

  </property>



  <property>

    <name>data.storage.sql.jdbc.driver.directory</name>

    <value>/Users/basan/Documents/CDAP/git/postgres</value>

    <description>

      The base directory for storing JDBC driver jars.

      Sub-directory with the name that matches with the value of "data.storage.implementation" setting

      will be searched for the corresponding JDBC driver and

      dependencies jars to connect to the configured sql instance.

      The JDBC driver class to load has to be specified using "data.storage.sql.jdbc.driver.name".

      This config can only be used when the storage implementation is postgresql.

    </description>

  </property>



Create the db in postgres with the name basanversion3 

Place the jar postgresql-42.2.24.jar  in the location /Users/basan/Documents/CDAP/git/postgres

Tuesday, November 8, 2022

Pivot of Column

 


val orderData = Seq(("ordernumber1", "Shipt", 3000 , "Texas"),
("ordernumber1", "DriveUp", 4600, "Florida"),
("ordernumber2", "ShipToHome", 4100 , "California"),
("ordernumber3", "Shipt", 3000, "Pennsylvania"),
("ordernumber4", "DriveUp", 5000, "Texas"),
("ordernumber5", "ShipToHome", 3300, "Florida"),
("ordernumber6", "ShipToHome", 3900, "California"),
("ordernumber7", "ShipToHome", 7000, "Pennsylvania"),
("ordernumber8", "DriveUp", 4100, "Texas"))

val df = orderData.toDF("orderNumber", "orderType", "amount" , "state" )

+------------+----------+------+------------+
|orderNumber |orderType |amount|state |
+------------+----------+------+------------+
|ordernumber1|Shipt |3000 |Texas |
|ordernumber1|DriveUp |4600 |Florida |
|ordernumber2|ShipToHome|4100 |California |
|ordernumber3|Shipt |3000 |Pennsylvania|
|ordernumber4|DriveUp |5000 |Texas |
|ordernumber5|ShipToHome|3300 |Florida |
|ordernumber6|ShipToHome|3900 |California |
|ordernumber7|ShipToHome|7000 |Pennsylvania|
|ordernumber8|DriveUp |4100 |Texas |
+------------+----------+------+------------+



val df2 = df.groupBy("orderType").pivot("state").agg(max("amount") as "maxAmount")
df2.show(false)

+----------+----------+-------+------------+-----+
|orderType |California|Florida|Pennsylvania|Texas|
+----------+----------+-------+------------+-----+
|ShipToHome|4100 |3300 |7000 |null |
|Shipt |null |null |3000 |3000 |
|DriveUp |null |4600 |null |5000 |
+----------+----------+-------+------------+-----+


val df2 = df.groupBy("orderType").pivot("state").agg(max("amount") as "maxAmount" , min("amount") as "minAmount")

+----------+--------------------+--------------------+-----------------+-----------------+----------------------+----------------------+---------------+---------------+
|orderType |California_maxAmount|California_minAmount|Florida_maxAmount|Florida_minAmount|Pennsylvania_maxAmount|Pennsylvania_minAmount|Texas_maxAmount|Texas_minAmount|
+----------+--------------------+--------------------+-----------------+-----------------+----------------------+----------------------+---------------+---------------+
|ShipToHome|4100 |3900 |3300 |3300 |7000 |7000 |null |null |
|Shipt |null |null |null |null |3000 |3000 |3000 |3000 |
|DriveUp |null |null |4600 |4600 |null |null |5000 |4100 |
+----------+--------------------+--------------------+-----------------+-----------------+----------------------+----------------------+---------------+---------------+



Wednesday, October 26, 2022

ISO 8601 format to Date

 import org.apache.spark.sql.types._

import org.apache.spark.sql.functions._

import org.apache.spark.sql.Column;

import org.apache.spark.sql.Dataset;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.SparkSession;

import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;

import org.apache.spark.sql.types.DataType;

import org.apache.spark.sql.types.DataTypes;


    val dfDate = Seq(("2021-01-25T13:33:44.343Z"),

    ("2019-02-05T14:06:31.556+0100"),

    ("2021-01-25T13:33:44.343+1:00")).toDF("input_timestamp")


    dfDate.show(false)


println("====   apply the to_timestamp method, loosing sub seconds=====")


  val resultdf = dfDate.withColumn("datetype_timestamp",to_timestamp(col("input_timestamp"),"yyyy-MM-dd'T'HH:mm:ss.SSSZ"))

  resultdf.printSchema

  resultdf.show(false)


println("==== explicit cast to timestamp... loosing sub seconds=====")

   val resultdf = dfDate.withColumn("datetype_timestamp",to_timestamp(col("input_timestamp"),"yyyy-MM-dd'T'HH:mm:ss.SSSZ").cast(TimestampType))

  resultdf.printSchema

  resultdf.show(false)



println("====  cast to timestamp... retains sub seconds  =====")


  val resultdf = dfDate.withColumn("datetype_timestamp",col("input_timestamp").cast(TimestampType))

   resultdf.printSchema

  resultdf.show(false)



println("====  cast to date... retains sub seconds  =====")


  val resultdf = dfDate.withColumn("datetype_timestamp",to_date(col("input_timestamp"),"yyyy-MM-dd'T'HH:mm:ss.SSSZ"))

   resultdf.printSchema

  resultdf.show(false)



println("====  parse as String  =====")


  val resultdf = dfDate.withColumn("datetype_timestamp1",date_format(col("input_timestamp"),"yyyy-MM-dd'T'HH:mm:ss.SSSZ"))

 resultdf.printSchema

  resultdf.show(false)



println("====  parsed  String cast it  =====")


  val resultdf3 = resultdf.withColumn("datetype_timestamp2",col("datetype_timestamp1").cast(TimestampType))

 resultdf3.printSchema

  resultdf3.show(false)

Monday, October 24, 2022

Parsing ISO 8601 format dates in spark

 

import org.apache.spark.sql.functions._

import org.apache.spark.sql.Column;

import org.apache.spark.sql.Dataset;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.SparkSession;

import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;

import org.apache.spark.sql.types.DataType;

import org.apache.spark.sql.types.DataTypes;


    val dfDate = Seq(("2021-01-25T13:33:44.343Z"),

    ("2019-02-05T14:06:31.556+0100")).toDF("input_timestamp")

  dfDate.withColumn("datetype_timestamp",to_timestamp(col("input_timestamp"),"yyyy-MM-dd'T'HH:mm:ss.SSSZ")).show(false)


  

+----------------------------+-------------------+

|input_timestamp             |datetype_timestamp |

+----------------------------+-------------------+

|2021-01-25T13:33:44.343Z    |null               |

|2019-02-05T14:06:31.556+0100|2019-02-05 18:36:31|

+----------------------------+-------------------+



Monday, July 11, 2022

Special characters in spark columns are supported

 



val spark = SparkSession.builder
.master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate()

val data = Seq(("James","Smith","USA","CA"),
("Michael","Rose","USA","NY"),
("Robert","Williams","USA","CA"),
("Maria","Jones","USA","FL")
)
val columns = Seq("firstname","lastname","country","state")
import spark.implicits._
val df = data.toDF(columns:_*)
df.show(false)
df.select("firstname","lastname").show()

================



val spark = SparkSession.builder
.master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate()

val data = Seq(("James","Smith","USA","CA"),
("Michael","Rose","USA","NY"),
("Robert","Williams","USA","CA"),
("Maria","Jones","USA","FL")
)
val columns = Seq("first name","last # name","coun ? try","sta te")
import spark.implicits._
val df = data.toDF(columns:_*)
df.show(false)
df.select("first name","last # name" , "coun ? try").show()


scala> df.select("first name","last # name" , "coun ? try").show() +----------+-----------+----------+ |first name|last # name|coun ? try| +----------+-----------+----------+ | James| Smith| USA| | Michael| Rose| USA| | Robert| Williams| USA| | Maria| Jones| USA| +----------+-----------+----------+