Powered By Blogger

Tuesday, October 29, 2019

Read message and column transformations

import java.util.UUID


import com.typesafe.config.Config
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SaveMode, _}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.functions.litimport org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges}
import org.apache.spark.sql.functions._

import scala.collection.mutable
import scala.io.Source
import scala.tools.fusesource_embedded.jansi.AnsiConsole

/**  * creates the dataset by passing RDD in various steps  *  */object DataHandlerService extends DataHandlerTrait {



  def loadResource(filename: String) = {
    val path = getClass.getResource(filename)
    // Source.fromInputStream(getClass.getResourceAsStream("/war-and-peace.txt")).getLines().foreach(println)
    val source = Source.fromURL(path)
    try source.mkString finally source.close()
  }

  def processMessages(messages: InputDStream[ConsumerRecord[String, String]], sparkSession: SparkSession, errorDirPath: String,
                      successPath: String, kafkaSink: Broadcast[KafkaSinkService], brValidators: Broadcast[mutable.Map[String, Array[Validator]]],
                      brDefaultValues: Broadcast[mutable.Map[String, Any]], brTransFormerMap: Broadcast[mutable.Map[String, AttributeTransformer]]
                      , brConfig: Broadcast[Config]): Unit = {

    var schema = SchemaService.getSchemaDefinition()
    val encoder = RowEncoder.apply(schema)



    val fileContents = loadResource("/test-schema")

    print(fileContents)
    val schema3 = DataType.fromJson(fileContents).asInstanceOf[StructType]
    print(schema3)


    messages.foreachRDD(rdd => {
      import sparkSession.implicits._
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      if (!rdd.isEmpty() && rdd.count() > 0) {

       val dataSet : Dataset[String] = sparkSession.createDataset(rdd.map(x => x.value()))
        print(dataSet.show(2))
        //perform the logic on the rdd       var streamData = sparkSession.read.option("mode", "PERMISSIVE").option("columnNameOfCorruptRecord", "corrupt_record").schema(schema3).json(dataSet)
       // val streamData = sparkSession.read.schema(schema3).json(dataSet)
        //  streamData.write.json("/Users/z002qhl/Documents/Spark/SparkProjects/sparkStreaming/table")       // streamData.write.mode("append").option("compression","snappy").format("orc").saveAsTable("z002qhl.testtable")
        print("schema::" + streamData.printSchema())

        //streamData.foreach( mes =>  println("msg:" + mes.mkString))
        val generateUUID = udf(() => UUID.randomUUID().toString)


       // val ts = to_timestamp($"dts", "MM/dd/yyyy HH:mm:ss")       // streamData.withColumn("ts", ts)
        streamData = streamData.withColumn("decorated",lit(1) ).withColumn("current_date", current_date())
            .withColumn("unix_timestamp", unix_timestamp())
        .withColumn("Next_processing_date", date_add(current_date(), 10))
            .withColumn("uuid",generateUUID())
            .withColumn("concatedrows", concat('action,'atp_tracking_id))
          .withColumn("casting", $"action".substr(0,3))





            print(streamData.show(5))

      }
      messages.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)


    })

Kafka message validation

val fileContents = loadResource("/input-schema")

print(fileContents)
val schema3 = DataType.fromJson(fileContents).asInstanceOf[StructType]
print(schema3)


messages.foreachRDD(rdd => {
  import sparkSession.implicits._
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  if (!rdd.isEmpty() && rdd.count() > 0) {

   val dataSet : Dataset[String] = sparkSession.createDataset(rdd.map(x => x.value()))
    print(dataSet.show(2))
    //perform the logic on the rdd   val streamData = sparkSession.read.option("mode", "PERMISSIVE").option("columnNameOfCorruptRecord", "corrupt_record").schema(schema3).json(dataSet)
   // val streamData = sparkSession.read.schema(schema3).json(dataSet)    print(streamData.show(5))
    //  streamData.write.json("/Users/z002qhl/Documents/Spark/SparkProjects/sparkStreaming/table")   // streamData.write.mode("append").option("compression","snappy").format("orc").saveAsTable("z002qhl.testtable")
    print("schema::" + streamData.printSchema())


    streamData.map(mes => mes.)


  }
  messages.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)


})


def loadResource(filename: String) = {
  val path = getClass.getResource(filename)
  // Source.fromInputStream(getClass.getResourceAsStream("/war-and-peace.txt")).getLines().foreach(println)
  val source = Source.fromURL(path)
  try source.mkString finally source.close()
}



{
  "type": "struct",
  "fields": [
    {
      "name": "action",
      "type": "string",
      "nullable": true,
      "metadata": {

      }
    },
    {
      "name": "tracking_id",
      "type": "string",
      "nullable": true,
      "metadata": {

      }
    },
    {
          "name": "corrupt_record",
          "type": "string",
          "nullable": true,
          "metadata": {

          }
        }


  ]
}

Sunday, October 27, 2019

Schema validation of all the fields in json


You can try the following code to read the JSON file based on Schema in Spark 2.2
import org.apache.spark.sql.types.{DataType, StructType}
//Read Json Schema and Create Schema_Json
val schema_json=spark.read.json("/user/Files/ActualJson.json").schema.json
//add the schema
val newSchema=DataType.fromJson(schema_json).asInstanceOf[StructType]
//read the json files based on schema
val df=spark.read.schema(newSchema).json("Json_Files/Folder Path")

Friday, October 25, 2019

DAG Stage Jobs Tasks relationship

In this article, I will try to explain how Spark works internally and what the components of execution are: jobs, tasks, and stages.


Job Tasks stages
https://dzone.com/articles/how-spark-internally-executes-a-program


So once you perform any action on an RDD, Spark context gives your program
 to the driver.

The driver creates the DAG (directed acyclic graph) or execution plan (job)
for your program. Once the DAG is created, the driver divides this DAG into
a number of stages. These stages are then divided into smaller tasks
 and all the tasks are given to the executors for execution.


 But why did Spark divided this program into two stages? Why not more than two or less than two? Basically, it depends on shuffling, i.e. whenever you perform any transformation where Spark needs to shuffle the data by communicating to the other partitions, it creates other stages for such transformations. And the transformation does not require the shuffling of your data; it creates a single stage for it.
But why did Spark divide only two tasks for each stage? It depends on your number of partitions.


DAG(JOB) -- > Splits into number of stages --> Tasks are given to executors -- > Executors will be executed in a machine.

One machine can have multiple executors with multiple cores.




Machine allocation executor and core

6 machines
16 cores per machine - 96 cores total
64gb ram per machine


smallest possible executor
1 core per executor - so that 96 executors we can have


largest possible
6 executors  with each executor having 16 cores
No resources for other process


best approach

96 cores : 1 core for each machine we have to leave for os

we have 90 cores, so we are left with 15 cores.


15 cores each machine ... we can have 3 executors with each 5 cores

63/3 = 21GB , leave 2 gb for yarn overhead 19gb per executor


SO each machine will have 3 executors , and each executor having 5 cores each





Sunday, October 20, 2019

Sqoop basics

Day2- sqoop

Only mappers will run. There will not be any reducers.


HDFS to RDBMS  can be done - sqoop export
RDBMS to HDFS can be done which is called sqoop import

By default 4 mappers will be assigned for the sqoop import execution

To look table from hadoop environment

To list databases
sqoop-list-databases \
--connect "jdbc:mysql://quickstart.cloudera:3306" \
--username retail_dba \
--password cloudera


To check list of tables in a particular db
sqoop-list-tables \
--connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
--username retail_dba \
--password cloudera


To check records in the table
sqoop-eval \
--connect "jdbc:mysql://quickstart.cloudera:3306" \
--username retail_dba \
--password cloudera \
--query "select * from retail_db.customers limit 10"


create a table in sql and insert some data
CREATE TABLE people (PersonID int,LastName varchar(255),FirstName varchar(255),Address varchar(255),City varchar(255));

to just sqoop one table run below command
========================================
sqoop import \
--connect jdbc:mysql://quickstart.cloudera:3306/retail_db \
--username root \
--password cloudera \
--table orders \
--m 1 \
--target-dir /queryresult

m implies number of mappers
target-dir implies where to copy the content of it.
target-dir should not be existing, so that system will create

Number of mappers if you increase its going to hit DB performance. Bound by number of connections.


For importing all the tables from the database
sqoop-import-all-tables \
--connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
--username retail_dba \
--password cloudera \
--as-sequencefile \
-m 4 \
--warehouse-dir /user/cloudera/sqoopdir

--as-sequencefile supports fileformat.
It will bring table by table.


Directory will have subfolders  /user/cloudera/sqoopdir  same as tablename.
SO extensively warehouse-dir will be used

UDF in HDFS

package udf_example;

import org.apache.hadoop.hive.ql.exec.UDF;

public class DataStandardization extends UDF{

public String evaluate(String input){
if(input == null){
return null;
}
return (input.toUpperCase());
}

}


rankfunctions.txt
Frank,1150
Frank,1700

CREATE TABLE IF NOT EXISTS rankchcektable(
    col1 string,
    col2 int
 
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' line terminated by '\n' STORED AS TEXTFILE


load data local inpath '/Users/basan/rankfunctions.txt' into table rankchcektable


Create jar

//Add the jar to hive
hive>add jar /users/basan/my_udf.jar
It will add jar to classpath

//create the function in hive

create temporary function f1 as 'udf_example.DataStandardization';

create temporary function functionname as 'packagename.classname';

//applying udf function
select f1(col1) from rankchcektable

sort by col by queries

Day2 :
Load the order.txt

create the table

order by - full sorting of data 1 reducer
select count from table2 order by count

https://drive.google.com/file/d/1abNo-jsy_l_Xo0krBoHpW3tkpSukr3Lp/view

order by will have do global sorting and it will use only one reducer.

We will run order by , sort by , cluster by  and distribute by which are related to sorting


aa,1
bb,1
dd,5
ef,2
teh,1

CREATE TABLE IF NOT EXISTS table2(
    name string,
    count int
 
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE


load data local inpath '/Users/basan/order.txt' into table table2

select count from table2 order by count

order by will take only 1 reducer , independent of how many reducers we throw
____ ___ ___ ____

//set explicitly 2 reducers
SET mapreduce.job.reduces=2
select count from table2 sort by count

it will produce 2 different data sets
sort by considers number of reducers and sorting happen in the respective reducer.
It will have
______ ___ ___

group by will also use single reducer


_______

distribute by  - will send data to specific reducer

SET mapreduce.job.reduces=2
select count from table2 distribute by count sort by count

It will make sure there is no overlap of data

Ideally we should use distribute by and then sort by to be used

By doing this we can force the 2 number of reducers

____

cluster by is same as  = distribute by count sort by count
select count from table2 cluster by count;


Rankings

create table using rankfunctions.txt

John,1500
Albert,1500
Mark,1000
Frank,1150
Frank,1700

CREATE TABLE IF NOT EXISTS rankchcektable(
    col1 string,
    col2 int
 
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' line terminated by '\n' STORED AS TEXTFILE


load data local inpath '/Users/basan/rankfunctions.txt' into table rankchcektable
//overwrite the old data
load data local inpath '/Users/basan/rankfunctions.txt' overwrite into table rankchcektable



select col1, col2, rank() over (order by col2 desc) as ranking from table2

gives tanking, ties are assigned the same rank with the next ranking skipped


Mark,1000 1
Frank,1150 2
John,1500  3
Albert,1500 3
Frank,1700 5

Difference between rank, dense rank and rownum


select col1, col2, dense_rank() over (order by col2 desc) as ranking from table2
Ties are  assigned same rank,Ranks are not skipped.

Mark,1000 1
Frank,1150 2
John,1500  3
Albert,1500 3
Frank,1700 4

____

row number : To find top n rankings it will be used

select col1, col2, row_number() over (order by col2 desc) as ranking from table2

Mark,1000 1
Frank,1150 2
John,1500  3
Albert,1500 4
Frank,1700 5

So rank assigned will be unique

____

select col1, col2, row_number() over (partition by col1 order by col2 desc) as ranking from table2

First it does grouping then assigns ranking

use case is top 2 spend per person.

Albert 1500 1
Bhut 800 1
lesa 1500 1
lesa 900 2



Saturday, October 19, 2019

BigData concepts in a nutshell

In the version of HDP1.0 we had only HDFS and MR

HDP 2.0 we started having HDFS , MR and YARN

With Spark YARN, MESOS or Kunernetes can be used as negotiator

HDFS : Addresses distributed storage issues
Pig : It is scripting language which can be used for doing ETL kind of jobs. Int he data pipeline we have to do cleaning using PIG scripts and then load into Hive table.
Hbase : It is Nosql database which provides ACID behavour. With the Hive we will not be able to update or delete the record . With Hbase we will be able to update or delete the record.

There is a way to make Hive table accessible in Hbase.

Oozie : It will be used for scheduling jobs.

 Usually scenarios involving processing of table goes to hive, search for a key related use cases will fall in Hbase


Pig uses only Mapper phase , all the other components will be using Mapper and Reducer



The Balanced approach of 2-racks and 3-copies in Rack awareness mechanism is adopted to
Minimize Write-bandwidth and Maximize Redundancy.

Name node federation concept is meant for -
Load sharing.

In which of the following scenarios would the introduction of combiner can lead to wrong results -
Calculating the average.

Consider the following table structure:students (name STRING,id INT,subjects ARRAY,feeDetails MAP,phoneNumber STRUCT ) . To list the subjects taken by each student, we can use the following query, which executes successfully: select name, explode(subjects) from students;
False

Which of the following work-flow is valid in MR
map->partition->shuffle->sort->reduce.

Name node federation : metadata can be divieded to other node. It is for load sharing.
Secondary node : for checkpointing and tollerance.

AWS Setup of HDFS

EC2 setup, free credits cannot be used for EMR cluster
AWS Management Console

Services - Search for EMR -
Create cluster
provide clustername - basancluster
select emr-5.27.0
which will have hadoop Ganglia hive

Can use m4.large machine - 0.10$/hr per instance cost


Google for ec2 pricing : can check the price of machines

number of instances 3
create key pair and configure the AWS to access cluster using PEM file

s3 is free in AWS
It will take some time to create the AWS cluster after the request.


Enable ssh to the master for accessing via ssh command
    click on create security group

inbound  : traffic coming inside the system, to access ssh create inbound rule

Edit inbound rules :
ssh 22 select : myip

Once this is done it will take some time for creating the cluster

Connect to the master node using below command
ssh -i filecreated.pem hadoop@ip.compute.amazonaws.com


AWS commands
hive
create database trendytech;
use trendytech;
show table

upload the file in s3 by creating bucket.(Search for S3 and create bucket under that folder and file
can br created)
In the services -> search for s3 buckets

loading data from s3 to table
load data  inpath 's3://basantech-basan/dataset.csv' into table country_input

remove the word local

Under the bucket create the folder and the folder path can be configured

Amazon - AWS - EMR - Elastic map reduce
Free tier is not valid for emr in AWS
Google - GCP- GoogleDataProc
Microsoft - Azure - HD Insight

AWS is very famous among all the clusters

Need not use Cloudera Manager.

hadoop fs -ls /user/hive/warehouse can be seen in elastic search as well.

Search on Services - EMR
Terminate to kill the service to kill the service and avoding getting charged on your credit card.

HDFS Loading files and creating TEXTFORMAT and ORC files

Create the Text Format table
       

CREATE TABLE IF NOT EXISTS country_input(
    longitide float,
    lattitude float,
    number int,
    street string,
    unit string,
    city string,
    district string,
    region string,
    postcode int,
    id string,
    hash string,
    country string  
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE

//loading the table from the file located in the path
load data local inpath '/Users/basan/datasetnew' into table country_input


select * from country_input where country='belgium' and street='Rue Ketels';

       
 
       

//Create table in ORC FORMAT

CREATE TABLE IF NOT EXISTS country_input_orc(
    longitide float,
    lattitude float,
    number int,
    street string,
    unit string,
    city string,
    district string,
    region string,
    postcode int,
    id string,
    hash string,
    country string
)ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS orc


ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' is not needed in ORC file format.

For populating into ORC table from the text file
insert into table country_input_orc select * from country_input

 /user/hive/warehouse - location for ORC tables


       
 
ORC tables will occupy less disk space and selection will be much faster compared to TEXTFORMAT file. If we want to still make the queries faster then we have to use partitioning and bucketing. For each partition there will be a folder created, and for each bucket there will be a file created under the partitioned folder. Once we define the size of bucket we cannot change, if needs to be changed then we have to create fresh table and import data from old table
       


CREATE TABLE IF NOT EXISTS country_bucket(
   longitide float,
    lattitude float,
    number int,
    street string,
    unit string,
    city string,
    district string,
    region string,
    postcode int,
    id string,
    hash string,
    country string
 )PARTITIONED BY (country string) CLUSTERED BY (street)
 into 32 buckets
 ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE


We have created table with country partitioning and 32 buckets

//Getting size of the underlaying folders 
 hadoop fs -du -h /user/hive/warehouse/trendytech.db
 hadoop fs -ls /user/hive/warehouse/trendytech.db/country_bucket

For each partition there will be one folder created

 hadoop fs -ls /user/hive/warehouse/trendytech.db/country_bucket/country=estonia
 will list all the files under the partitioned folder

       
 


//Getting size of the underlaying folders
 hadoop fs -du -h /user/hive/warehouse/trendytech.db
 hadoop fs -ls /user/hive/warehouse/trendytech.db/country_bucket

For each partition there will be one folder created

 hadoop fs -ls /user/hive/warehouse/trendytech.db/country_bucket/country=estonia
 will list all the files under the partitioned folder

Sunday, October 13, 2019

Date Time Operation in Scala and Spark


scala> import java.sql.Date
import java.sql.Date

scala> import org.apache.spark.sql.types.{DateType, IntegerType}
import org.apache.spark.sql.types.{DateType, IntegerType}

scala> Date.valueOf("2016-09-30")
res0: java.sql.Date = 2016-09-30

scala> import java.time.LocalDateTime
import java.time.LocalDateTime

scala>

scala> import java.sql.Timestamp
import java.sql.Timestamp

scala> val levels = Seq(
     |   // (year, month, dayOfMonth, hour, minute, second)
     |   ((2012, 12, 12, 12, 12, 12), 5),
     |   ((2012, 12, 12, 12, 12, 14), 9),
     |   ((2012, 12, 12, 13, 13, 14), 4),
     |   ((2016, 8,  13, 0, 0, 0), 10),
     |   ((2017, 5,  27, 0, 0, 0), 15)).
     |   map { case ((yy, mm, dd, h, m, s), a) => (LocalDateTime.of(yy, mm, dd, h, m, s), a) }.
     |   map { case (ts, a) => (Timestamp.valueOf(ts), a) }.
     |   toDF("time", "level")
levels: org.apache.spark.sql.DataFrame = [time: timestamp, level: int]

scala>

scala> levels.show
+-------------------+-----+
|               time|level|
+-------------------+-----+
|2012-12-12 12:12:12|    5|
|2012-12-12 12:12:14|    9|
|2012-12-12 13:13:14|    4|
|2016-08-13 00:00:00|   10|
|2017-05-27 00:00:00|   15|
+-------------------+-----+


scala>

https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-functions-datetime.html

Saturday, October 12, 2019

Spark Unit Test First example using scalatest

package com.chapter16.SparkTesting
import org.scalatest.{ BeforeAndAfterAll, FunSuite }
import org.scalatest.Assertions._
import org.apache.spark.sql.SparkSession
import org.apache.spark.rdd.RDD
class wordCountTest2 extends FunSuite with BeforeAndAfterAll {
  var spark: SparkSession = null
  def tokenize(line: RDD[String]) = {
    line.map(x => x.split(' ')).collect()
  }
  override def beforeAll() {
    spark = SparkSession
      .builder
      .master("local[*]")
      .config("spark.sql.warehouse.dir", "E:/Exp/")
      .appName(s"OneVsRestExample")
      .getOrCreate()
  }  
  test("Test if two RDDs are equal") {
    val input = List("To be,", "or not to be:", "that is the question-", "William Shakespeare")
    val expected = Array(Array("To", "be,"), Array("or", "not", "to", "be:"), Array("that", "is", "the", "question-"), Array("William", "Shakespeare"))
    val transformed = tokenize(spark.sparkContext.parallelize(input))
    assert(transformed === expected)
  }  
  test("Test for word count RDD") {
    val fileName = "C:/Users/rezkar/Downloads/words.txt"
    val obj = new wordCountRDD
    val result = obj.prepareWordCountRDD(fileName, spark)    
    assert(result.count() === 214)
  }
  override def afterAll() {
    spark.stop()
  }
}

Tuesday, October 8, 2019

Creating Dataframe of structtype

val dfLocal2 = Seq(
      (1,"PRESCHOOL", true, "basan", "11", "asdas", "3", "LINES"),
      (2, null,        false, "basan", "32", "asfasf", "13", "/COSMETICS"),
      (3,"FURNITURE",         null, "basan", "23", "asfasf HOME", "4", "HOME")

    ).toDF("id_int_null","string_null","boolean_null","TCIN","DIVISION_ID","DIVISION_NAME","GROUP_ID","GROUP_NAME")
dfLocal2.show()
dfLocal2.printSchema()



___


import org.apache.spark.sql.Row
import org.apache.spark.sql._
import org.apache.spark.sql.types._

val someData = Seq(
  Row(8, "bat"),
  Row(64, "mouse"),
  Row(-27, "horse"),
  Row(null,"abc")
)

val someSchema = List(
  StructField("number", IntegerType, true),
  StructField("word", StringType, true)
)

val someDF = spark.createDataFrame(
  sc.parallelize(someData),
  StructType(someSchema)
)

someDF.show(6)

someDF.show()

Try Success failure on RDD operation

import scala.util.{Failure, Success, Try}
import org.apache.spark.sql.Row
     
     
      val rdd= spark.sparkContext.parallelize(Seq(("Java", 20000), ("Python", 100000), ("Scala", 3000)))

        val goodBadRecords = rdd.map(line =>
          Try{
            print(line)
         
          } match {
            case Success(map) => Right(map)
            case Failure(e) => Left(e)
          }

        )

        val records = goodBadRecords.filter(_.isRight)
        val errors = goodBadRecords.filter(_.isLeft)
       
       
        records.count()
        errors.count()

Monday, October 7, 2019

create Dstream manually using REPL

import scala.collection.mutable
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
val lines = Seq("To be or not to be.", "That is the question.")

val rdd = sc.parallelize(lines)

val lines = mutable.Queue[RDD[String]]()

val streamingContext = new StreamingContext(sc, Seconds(10))
val dstream = streamingContext.queueStream(lines)

// append data to DStream
lines += sc.makeRDD(Seq("To be or not to be.", "That is the question."))


http://mkuthan.github.io/blog/2015/03/01/spark-unit-testing/

Add fixed value column in dataframe

scala> val dfLocal = Seq(
     |              ("DEPARTMENT_ID1","DEPARTMENT_NAME1","GROUP_NAME1"),
     |
     |               ("1","PRESCHOOL",      "asdasf"),
     |               ("2","COSMETICS",              "COSMETICS"),
     |               ("3","FURNITURE",           "HOME")
     |             ).toDF("DEPARTMENT_ID","DEPARTMENT_NAME","GROUP_NAME")
dfLocal: org.apache.spark.sql.DataFrame = [DEPARTMENT_ID: string, DEPARTMENT_NAME: string ... 1 more field]

scala> dfLocal.printSchema()
root
 |-- DEPARTMENT_ID: string (nullable = true)
 |-- DEPARTMENT_NAME: string (nullable = true)
 |-- GROUP_NAME: string (nullable = true)


scala> import org.apache.spark.sql.functions.typedLit
import org.apache.spark.sql.functions.typedLit

scala> val successDataSet2 = dfLocal.withColumn("newcolumnadded", lit("204"))
successDataSet2: org.apache.spark.sql.DataFrame = [DEPARTMENT_ID: string, DEPARTMENT_NAME: string ... 2 more fields]

scala> successDataSet2.show(2)
+--------------+----------------+-----------+--------------+
| DEPARTMENT_ID| DEPARTMENT_NAME| GROUP_NAME|newcolumnadded|
+--------------+----------------+-----------+--------------+
|DEPARTMENT_ID1|DEPARTMENT_NAME1|GROUP_NAME1|           204|
|             1|       PRESCHOOL|     asdasf|           204|
+--------------+----------------+-----------+--------------+
only showing top 2 rows


scala> successDataSet2.printSchema()
root
 |-- DEPARTMENT_ID: string (nullable = true)
 |-- DEPARTMENT_NAME: string (nullable = true)
 |-- GROUP_NAME: string (nullable = true)
 |-- newcolumnadded: string (nullable = false)


scala>

Sunday, October 6, 2019

De dupe in Dataframe

val df = Seq(
  ("d1", "2018-09-20 10:00:00", "blah1"),
  ("d2", "2018-09-20 09:00:00", "blah2"),
  ("d1", "2018-09-20 10:01:00", "blahnew")
).toDF("documentId","timestamp","anotherField")

import org.apache.spark.sql.functions.row_number
import org.apache.spark.sql.expressions.Window

val w = Window.partitionBy($"documentId").orderBy($"timestamp".desc)
val Resultdf = df.withColumn("rownum", row_number.over(w))
     .where($"rownum" === 1).drop("rownum")

Resultdf.show()
input:

+----------+-------------------+------------+
|documentId|          timestamp|anotherField|
+----------+-------------------+------------+
|        d1|2018-09-20 10:00:00|       blah1|
|        d2|2018-09-20 09:00:00|       blah2|
|        d1|2018-09-20 10:01:00|     blahnew|
+----------+-------------------+------------+
output:

+----------+-------------------+------------+
|documentId|          timestamp|anotherField|
+----------+-------------------+------------+
|        d2|2018-09-20 09:00:00|       blah2|
|        d1|2018-09-20 10:01:00|     blahnew|
+----------+-------------------+------------+

Converting string date value to Date object

scala> import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.IntegerType

scala> import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StringType

scala> import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructType

scala> import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructField

scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala> import scala.collection.JavaConversions._
import scala.collection.JavaConversions._

scala>     val data = Seq(("Java", 20000 , "2012-02-05"), ("Python", 100000 , "2012-02-06"), ("Scala", 3000 , "2012-02-07"))
data: Seq[(String, Int, String)] = List((Java,20000,2012-02-05), (Python,100000,2012-02-06), (Scala,3000,2012-02-07))

scala>     val tableColumns = List(("language", "string"), ("people", "integer"), ("registerdate", "String"))
tableColumns: List[(String, String)] = List((language,string), (people,integer), (registerdate,String))

scala>     var schema = new StructType
schema: org.apache.spark.sql.types.StructType = StructType()

scala>     for (i <- p="" tablecolumns="">     |       schema = schema.add(i._1, i._2)

scala>       val rowData = data.map(attributes => Row(attributes._1, attributes._2 ,  attributes._3))
rowData: Seq[org.apache.spark.sql.Row] = List([Java,20000,2012-02-05], [Python,100000,2012-02-06], [Scala,3000,2012-02-07])

scala>       var dfFromData4 = spark.createDataFrame(rowData,schema)
dfFromData4: org.apache.spark.sql.DataFrame = [language: string, people: int ... 1 more field]

scala>       dfFromData4.show(2)
+--------+------+------------+
|language|people|registerdate|
+--------+------+------------+
|    Java| 20000|  2012-02-05|
|  Python|100000|  2012-02-06|
+--------+------+------------+
only showing top 2 rows


scala>

scala>         import org.apache.spark.sql.functions.to_date
import org.apache.spark.sql.functions.to_date

scala>         val dateFormat = "yyyy-dd-MM"
dateFormat: String = yyyy-dd-MM

scala>        var dfFromData5 = dfFromData4.withColumn("registerdate2", to_date(col("registerdate"), dateFormat))
dfFromData5: org.apache.spark.sql.DataFrame = [language: string, people: int ... 2 more fields]

scala>        dfFromData5.show(3)
+--------+------+------------+-------------+
|language|people|registerdate|registerdate2|
+--------+------+------------+-------------+
|    Java| 20000|  2012-02-05|   2012-05-02|
|  Python|100000|  2012-02-06|   2012-06-02|
|   Scala|  3000|  2012-02-07|   2012-07-02|
+--------+------+------------+-------------+


scala> dfFromData5.printSchema()
root
 |-- language: string (nullable = true)
 |-- people: integer (nullable = true)
 |-- registerdate: string (nullable = true)
 |-- registerdate2: date (nullable = true)

Using Date Format in dataframe


scala> import org.apache.spark.sql.functions.to_date

import org.apache.spark.sql.functions.to_date

scala> val dateFormat = "yyyy-dd-MM"
dateFormat: String = yyyy-dd-MM

scala> val cleanDateDF = spark.range(1).select(
     | to_date(lit("2017-12-11"), dateFormat).alias("date"),
     | to_date(lit("2017-20-12"), dateFormat).alias("date2"))
cleanDateDF: org.apache.spark.sql.DataFrame = [date: date, date2: date]

scala> cleanDateDF.show(2)
+----------+----------+
|      date|     date2|
+----------+----------+
|2017-11-12|2017-12-20|
+----------+----------+





Spark the Definitive Guide is having good details about manipulating Hadoop

Converting String field to Date in Dataframe


val sourceDF = spark.createDF(
  List(
    (1, "2013-01-30"),
    (2, "2012-01-01")
  ), List(
    ("person_id", IntegerType, true),
    ("birth_date", StringType, true)
  )
).withColumn(
  "birth_date",
  col("birth_date").cast("date")
)

sourceDF.show()
sourceDF.printSchema()


https://mungingdata.com/apache-spark/dates-times/

Saturday, October 5, 2019

Converting Dataframe String type to Integer

acbc32a57245:bin z002qhl$ ./spark-shell
19/10/06 12:05:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://192.168.1.2:4040
Spark context available as 'sc' (master = local[*], app id = local-1570343764090).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.1
      /_/

Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_191)
Type in expressions to have them evaluated.
Type :help for more information.

scala> import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.IntegerType

scala> import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StringType

scala> import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructType

scala> import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructField

scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala> import scala.collection.JavaConversions._
import scala.collection.JavaConversions._

scala>     val data = Seq(("Java", "20000"), ("Python", "100000"), ("Scala", "3000"))
data: Seq[(String, String)] = List((Java,20000), (Python,100000), (Scala,3000))

scala>     val tableColumns = List(("order_number", "string"), ("external_order_number", "string"))
tableColumns: List[(String, String)] = List((order_number,string), (external_order_number,string))

scala>     var schema = new StructType
schema: org.apache.spark.sql.types.StructType = StructType()

scala>     for (i <- p="" tablecolumns="">     |       schema = schema.add(i._1, i._2)

scala>       val rowData = data.map(attributes => Row(attributes._1, attributes._2))
rowData: Seq[org.apache.spark.sql.Row] = List([Java,20000], [Python,100000], [Scala,3000])

scala>       var dfFromData4 = spark.createDataFrame(rowData,schema)
dfFromData4: org.apache.spark.sql.DataFrame = [order_number: string, external_order_number: string]

scala>       dfFromData4.show(2)
+------------+---------------------+
|order_number|external_order_number|
+------------+---------------------+
|        Java|                20000|
|      Python|               100000|
+------------+---------------------+
only showing top 2 rows


scala>

scala> val dfFromData5 = dfFromData4.withColumn("external_order_number", 'external_order_number cast "int")
dfFromData5: org.apache.spark.sql.DataFrame = [order_number: string, external_order_number: int]

scala> dfFromData5.printSchema
root
 |-- order_number: string (nullable = true)
 |-- external_order_number: integer (nullable = true)
<- p="" tablecolumns="">
<- p="" tablecolumns="">
<- p="" tablecolumns="">
<- p="" tablecolumns="">
<- p="" tablecolumns="">---------
<- p="" tablecolumns="">
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.Row
import scala.collection.JavaConversions._
    val data = Seq(("Java", "20000"), ("Python", "100000"), ("Scala", "3000"))
    val tableColumns = List(("order_number", "string"), ("external_order_number", "string"))
    var schema = new StructType
    for (i <- p="" tablecolumns="">      schema = schema.add(i._1, i._2)
      val rowData = data.map(attributes => Row(attributes._1, attributes._2))
      var dfFromData4 = spark.createDataFrame(rowData,schema)
      dfFromData4.show(2)


val dfFromData6 =dfFromData4.withColumn("external_order_number", dfFromData4("external_order_number").cast("int"))
<- p="" tablecolumns="">
dfFromData6.printSchema

Message to RDD to Dataframe example

import spark.implicits._
val columns = Seq("language","users_count")
val data = Seq(("Java", "20000"), ("Python", "100000"), ("Scala", "3000"))
val rdd = spark.sparkContext.parallelize(data)
val dfFromRDD1 = rdd.toDF()
dfFromRDD1.printSchema()
val dfFromRDD1 = rdd.toDF("language","users_count")
dfFromRDD1.printSchema()

val dfFromRDD2 = spark.createDataFrame(rdd).toDF(columns:_*)

import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.Row
val schema = StructType(columns
  .map(fieldName => StructField(fieldName, StringType)))
val rowRDD = rdd.map(attributes => Row(attributes._1, attributes._2))
val dfFromRDD3 = spark.createDataFrame(rowRDD,schema)

import spark.implicits._
val dfFromData1 = data.toDF()


import scala.collection.JavaConversions._

val rowData = data.map(attributes => Row(attributes._1, attributes._2))
var dfFromData4 = spark.createDataFrame(rowData,schema)

Message to Dataframe using Schema

scala> acbc32a57245:bin z002qhl$ ./spark-shell
19/10/06 11:40:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://192.168.1.2:4040
Spark context available as 'sc' (master = local[*], app id = local-1570342289541).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.1
      /_/

Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_191)
Type in expressions to have them evaluated.
Type :help for more information.

scala> import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.IntegerType

scala> import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StringType

scala> import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructType

scala> import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructField

scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala> import scala.collection.JavaConversions._
import scala.collection.JavaConversions._

scala>     val data = Seq(("Java", 20000), ("Python", 100000), ("Scala", 3000))
data: Seq[(String, Int)] = List((Java,20000), (Python,100000), (Scala,3000))

scala>     val tableColumns = List(("order_number", "string"), ("external_order_number", "integer"))
tableColumns: List[(String, String)] = List((order_number,string), (external_order_number,integer))

scala>     var schema = new StructType
schema: org.apache.spark.sql.types.StructType = StructType()

scala>     for (i <- p="" tablecolumns="">     |       schema = schema.add(i._1, i._2)

scala>       val rowData = data.map(attributes => Row(attributes._1, attributes._2))
rowData: Seq[org.apache.spark.sql.Row] = List([Java,20000], [Python,100000], [Scala,3000])

scala>       var dfFromData4 = spark.createDataFrame(rowData,schema)
dfFromData4: org.apache.spark.sql.DataFrame = [order_number: string, external_order_number: int]

scala>       dfFromData4.show(2)
+------------+---------------------+
|order_number|external_order_number|
+------------+---------------------+
|        Java|                20000|
|      Python|               100000|
+------------+---------------------+
only showing top 2 rows

creating dataframe in spark shell

acbc32a57245:bin z002qhl$ pwd
/Users/basan/Documents/Spark/spark-2.4.1-bin-hadoop2.7/bin
acbc32a57245:bin z002qhl$ ./spark-s
spark-shell   spark-sql     spark-submit

acbc32a57245:bin basan$ ./spark-shell
19/10/06 08:43:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://192.168.1.4:4040
Spark context available as 'sc' (master = local[*], app id = local-1570331637922).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.1
      /_/

Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_191)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val data = List(("James ","","Smith","36636","M",60000),
     |         ("Michael ","Rose","","40288","M",70000),
     |         ("Robert ","","Williams","42114","",400000),
     |         ("Maria ","Anne","Jones","39192","F",500000),
     |         ("Jen","Mary","Brown","","F",0))
data: List[(String, String, String, String, String, Int)] = List(("James ","",Smith,36636,M,60000), ("Michael ",Rose,"",40288,M,70000), ("Robert ","",Williams,42114,"",400000), ("Maria ",Anne,Jones,39192,F,500000), (Jen,Mary,Brown,"",F,0))

scala> val cols = Seq("first_name","middle_name","last_name","dob","gender","salary")
cols: Seq[String] = List(first_name, middle_name, last_name, dob, gender, salary)

scala> val df = spark.createDataFrame(data).toDF(cols:_*)
df: org.apache.spark.sql.DataFrame = [first_name: string, middle_name: string ... 4 more fields]

scala> df.show(2)
+----------+-----------+---------+-----+------+------+
|first_name|middle_name|last_name|  dob|gender|salary|
+----------+-----------+---------+-----+------+------+
|    James |           |    Smith|36636|     M| 60000|
|  Michael |       Rose|         |40288|     M| 70000|
+----------+-----------+---------+-----+------+------+
only showing top 2 rows


scala> val df4 = df.select(col("*"),
     |       expr("case when gender = 'M' then 'Male' " +
     |                        "when gender = 'F' then 'Female' " +
     |                        "else 'Unknown' end").alias("new_gender"))
df4: org.apache.spark.sql.DataFrame = [first_name: string, middle_name: string ... 5 more fields]

scala> df4.show(2)
+----------+-----------+---------+-----+------+------+----------+
|first_name|middle_name|last_name|  dob|gender|salary|new_gender|
+----------+-----------+---------+-----+------+------+----------+
|    James |           |    Smith|36636|     M| 60000|      Male|
|  Michael |       Rose|         |40288|     M| 70000|      Male|
+----------+-----------+---------+-----+------+------+----------+
only showing top 2 rows


scala> val df3 = df.withColumn("new_gender2",
     |       expr("case when gender = 'M' then 'Male' " +
     |                        "when gender = 'F' then 'Female' " +
     |                        "else 'Unknown' end"))
df3: org.apache.spark.sql.DataFrame = [first_name: string, middle_name: string ... 5 more fields]

scala> df3.show(2)
+----------+-----------+---------+-----+------+------+-----------+
|first_name|middle_name|last_name|  dob|gender|salary|new_gender2|
+----------+-----------+---------+-----+------+------+-----------+
|    James |           |    Smith|36636|     M| 60000|       Male|
|  Michael |       Rose|         |40288|     M| 70000|       Male|
+----------+-----------+---------+-----+------+------+-----------+
only showing top 2 rows


scala>

scala> val df3 = df.withColumn("geneder",
     |       expr("case when gender = 'M' then 'Male' " +
     |                        "when gender = 'F' then 'Female' " +
     |                        "else 'Unknown' end"))
df3: org.apache.spark.sql.DataFrame = [first_name: string, middle_name: string ... 5 more fields]

scala> df3.show(2)
+----------+-----------+---------+-----+------+------+-------+
|first_name|middle_name|last_name|  dob|gender|salary|geneder|
+----------+-----------+---------+-----+------+------+-------+
|    James |           |    Smith|36636|     M| 60000|   Male|
|  Michael |       Rose|         |40288|     M| 70000|   Male|
+----------+-----------+---------+-----+------+------+-------+
only showing top 2 rows


scala>

scala> val df3 = df.withColumn("gender",
     |       expr("case when gender = 'M' then 'Male' " +
     |                        "when gender = 'F' then 'Female' " +
     |                        "else 'Unknown' end"))
df3: org.apache.spark.sql.DataFrame = [first_name: string, middle_name: string ... 4 more fields]

scala> df3.show(2)
+----------+-----------+---------+-----+------+------+
|first_name|middle_name|last_name|  dob|gender|salary|
+----------+-----------+---------+-----+------+------+
|    James |           |    Smith|36636|  Male| 60000|
|  Michael |       Rose|         |40288|  Male| 70000|
+----------+-----------+---------+-----+------+------+
only showing top 2 rows


Wednesday, October 2, 2019

Add jar to spark classpath

scala> :require /Users/basan/.ivy2/cache/org.apache.kafka/kafka-clients/jars/kafka-clients-0.10.2.2.jar
Added '/Users/basan/.ivy2/cache/org.apache.kafka/kafka-clients/jars/kafka-clients-0.10.2.2.jar' to classpath.


scala> import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecord

scala>      val record1 : ConsumerRecord[String, String] = new ConsumerRecord("topic1",0,100,"Key1", "value1")
record1: org.apache.kafka.clients.consumer.ConsumerRecord[String,String] = ConsumerRecord(topic = topic1, partition = 0, offset = 100, NoTimestampType = -1, checksum = -1, serialized key size = -1, serialized value size = -1, key = Key1, value = value1)

scala>     val record2 : ConsumerRecord[String, String] = new ConsumerRecord("topic1",0,100,"Key2", "value2")
record2: org.apache.kafka.clients.consumer.ConsumerRecord[String,String] = ConsumerRecord(topic = topic1, partition = 0, offset = 100, NoTimestampType = -1, checksum = -1, serialized key size = -1, serialized value size = -1, key = Key2, value = value2)

scala>     record1.value()
res0: String = value1