Powered By Blogger

Saturday, November 23, 2019

Dynamic Partitioning in Hive

============================================================
Dynamic partitioing
============================================================
By default

check existing values
set hive.exec.dynamic.partition
set hive.exec.dyanamic.partition.mode

set hive.exec.dynamic.partition=true
set hive.exec.dyanamic.partition.mode=nonstrict


hive> set hive.exec.dynamic.partition
    > ;
hive.exec.dynamic.partition=true
hive> set hive.exec.dyanamic.partition.mode
    > ;
hive.exec.dyanamic.partition.mode is undefined
hive>

Dump data from normal table to partitioned table

//set the hive settings for dynamic partitioning
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;

//create table without any partitions and load data, observer state is part of the column
create table orders_no_partition
(
  id string,
  customer_id string,
  product_id string,
  quantity int,
  amount double,
  zipcode char(5),
  state char(2)
)
COMMENT 'Table with no partition applied'
row format delimited fields terminated by ',';



orders_CA.csv
o1, c1, p1, 1, 1.11, 90111,CA
o2, c2, p2, 1, 2.22, 90222,CA
o3, c3, p3, 1, 3.33, 90333,CA
o4, c4, p4, 1, 4.44, 90444,CA

orders_CT.csv
o10, c10, p10, 10, 10.11, 900111,CT
o20, c20, p20, 10, 20.22, 900222,CT
o30, c30, p30, 10, 30.33, 900333,CT
o40, c40, p40, 10, 40.44, 900444,CT


load data local inpath '/home/cloudera/Desktop/orders_CA.csv'
into table orders_no_partition


load data local inpath '/home/cloudera/Desktop/orders_CT.csv'
into table orders_no_partition

//create table with  partition, see its written at partitioned by
create table orders_partition
(
  id string,
  customer_id string,
  product_id string,
  quantity int,
  amount double,
  zipcode char(5)
)partitioned by (state char(2));

//load the data into partitioned table from non partitioned table
insert into table orders_partition partition(state) select * from orders_no_partition;

//observe contents are organised as per partitioning stratergy
[root@quickstart Desktop]# hdfs dfs -ls /user/hive/warehouse/basan.db/orders_partition
Found 2 items
drwxrwxrwx   - cloudera supergroup          0 2019-11-23 23:20 /user/hive/warehouse/basan.db/orders_partition/state=CA
drwxrwxrwx   - cloudera supergroup          0 2019-11-23 23:20 /user/hive/warehouse/basan.db/orders_partition/state=CT
[root@quickstart Desktop]#

Hive optimizations

Join column- Hive Optimizations - Hive session 6

For each column there will be one MR will run.

to optimize we have to reduce joins and minimize MR.

bucketing helps : as it scans less number of records

join column will be the key of the mapper output.
in reduce results will be aggregated.

Hive wrapper helps in writing the job.
We can remove the  the reduce phase for certain queries. This join is called
as map join as it does not require shuffling.


Inner join/common join : Only matching records from both the tables

left outer join : matching records + all records from left + padded with nulls

right outer join : matching records + all records from right + padded with nulls


full outer join : union of left outer join and right outer join

If we have 2 tables, left table 10mb and right table 100gb


============================================================
Join side Optimizations
Bucket Map join
Sort Merge Bucket join (SMB)
============================================================
map side join

small table will go to all the machines.

big table will be distributed across the machines, and on each machine
 left table data will go and sit.

inner joins can be trated as map side join, if the table is small.
by default 25MB is the size of the small table. if needed we can change
the size of the table.


left outer join : matching records + table from left.
since the left table is small,we will not know what to do for
non matching records , so we cannot use map join.
left outer join does not support map side join even though the left table
is small


right outer join : matching records + records from right
If the left table is small and right is big, as per the definition
right outer join is possible.


full outer join : left outer join + right outer join, as  left outer join
does not support map side join.

============================================================
If the right table is small then the above scenarios will be reversed

============================================================
Join side Optimizations
Bucket Map join
Sort Merge Bucket join (SMB)

============================================================
Bucket Map join
In the Map join, it expects one table is small.
Bucket Map join can work with 2 big tables.

Both the tables should be bucketed on join column.

Buckets in one table should be an integral multiple of number
 ofbuckets in other table

2 2
2 4
2 6
3 6

Load only one bucket in memory.
If it is not integral multiple it will not be performant.

If the hash function used is same in both the tables, system will
not know which buket needs to be loaded.


============================================================

SMB : Sort merge Bucket join

Tables can be big
Both the tables should be bucketed on join column.
Number of buckets in both the table should be same.

Data is sorted based on join columns in both the tables.

one to one mapping from source and dest will happen as both are sorted.
but effort is involved in sorting data.

============================================================

Hive :
Datawarehouse
Database and Datawarehouse difference.
Data is stored in table. Data is in hdfs. /usr/hive/warehouse

metastore - schema is stored
derby - will store metadata by default

managed table/ external table
Dropping data  : managed table - data will be deleted

load tables using the files, load can be data from


views
normalization and denormalization

define user defined functions


optimisations

structural optimisations
partitionsing : distinct values less ,  folder
bucketing : file.


Query optimisations :
use less joins
map side join : small table will be fit into memory. We can revers table
Bucket map join : bucketed on join columns, big table join possible
smb : buckets should be same and data should be sorted.






Text analysis and small code




package com.basan.day3

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.log4j._
import scala.collection.immutable._

object ErrorWarnCount {

  def main(args: Array[String]) {

    val sc = new SparkContext("local[*]", "ERROR count")

    val list: List[String] = List(
      "WARN : Tuesday 4 September 0408",
      "ERROR : Tuesday 4 September 0408",
      "ERROR : Tuesday 4 September 0408",
      "ERROR : Tuesday 4 September 0408")

    val originalRDD = sc.parallelize(list)
    val new_pair_rdd = originalRDD.map(x => {
      var columns = x.split(":")
      var level = columns(0)
      var date = columns(1)
      (level, 1)

    })

    val redultant_rdd = new_pair_rdd.reduceByKey((x, y) => x + y)
    redultant_rdd.collect().foreach(println)

    println("single line conversion")
    //convertin to single line
    
    sc.parallelize(list).map(x=> {
        (x.split(":")(0),1)
     }).reduceByKey(_+_).collect().foreach(println)
    
   

  }

}

Broadcast join

stages are marked by boundaries

There are 2 kind of transformations
Narrow transformations and wide transformations

Narrow transformations : works on data locality. No shuffle not required
map, filter


wide transformations : shuffle happens
reduceBykey
groupByKey

wide transformations takes more time.

==================
Difference between repartition and coalesce
getting number of partitons

rdd.getNumpartitions

To change the partition we can use

repartition or coalesce.

To increase the number of partitions repartition has to be used.
TO decrease the number of partitions repartition and coalesce can be used. Ideally go with coalesce for decreasing partitions.

coalesce : it will try to avoid shuffling.

1 GB file in HDFS can create - 8 partitions assuming block size of 128MB, assuming 4 data nodes

rdd.coalesce(4) - no shuffle happens
rdd.coalesce(2) - shuffle happens

It might so happen that we would have 1000s of blocks, and each block we might have very less records
then we coalesce and reduce the partitions.

======================================================
How many times each movie is watched.

Map side join : smaller data set was kept in each machine.
================== ====================================

Map side join is nothting but Broadcast join


Broadcast  join :
smaller data been broadcasted and joined.

In the driver side broadcast it
    // Create a broadcast variable of our ID -> movie name map
    var nameDict = sc.broadcast(loadMovieNames)


broadcasted map can be accessed as below nameDict.value(x._2)
 val sortedMoviesWithNames = sortedMovies.
      map(x => (nameDict.value(x._2), x._1))

Friday, November 22, 2019

Hive Optimization Techniques

==========================================
set operations

    Union, munus, Intersect
    Only Union is supported in hive, minus and Intersect are not supported.
    We can write the query to achieve minus and Intersect.

==========================================

Sub queries

    IN , NOT IN
    exists , not exists
Only these 2 are supported in hive

==========================================
Views
Logical table.
Creating the table which will have subset of the columns.

create view viewname as (query)

Will give security.
can create multiple logical table.

hiding complexity of the query.
This query will be stored in metastore.

==========================================
Normalization vs Denormalization

 Normalization : Data should not be redundant
 Dividing bigger table into multiple smaller table to remove  with
 the intent to remove redundancy

Normalization is good for transactional db.


Denormalization : keep the data in big table rather than small tables.
it will have issue of data redundancy.

It will help in reducing the number of joins.



Hive Basic Concepts:
    Datawarehouse
    Data
    metadata : metastore
    managed table, external table, temporary table
    loading data from local and from hdfs
    complex data types.
    UDF, UDAF, UDTF
    Views
    Normalization and deNormalization

==========================================

Hive Optimization techniques:
1. table structure level Optimization
        Partitioning and bucketing

2. Optimization of hive query

1. table structure level Optimization
    Logically segreagate the data
    Partitioning and bucketing

Static and Dynamic Partitioning

We can have hierarchy of the folders in partitioning.
We dont have control on the number of partitions created.
Partition is the folder
==========================================
Bucketing tables :

How bucketing works?
Bucket value has to be fixed while creating table.
Each bucket is a file.

hash function used for inserting will be used by searching also to decide
which bucket to read.

Whenever we have lot of distinct values we can use bucketing
==========================================
We can use combination of partitioning and bucketing to organize the data.

Hive custom UDF and the built in functions


Complex data types
Arrays, map , struct

Arrays : similar set of data
map : key value pair.
struct : similar to class. similar to struct in c.
         more than one field with various data types. ex : Employee class
         logical grouping of data. struct can have another struct, map


Built in functions :
functions given by the framework
UDF : User defined functions
    takes one input row and one output row. ex: length , trim, concat , round, floor
UDAF :  User defined aggregate functions
    works on multiple row outputs a single row
    sum(), avg() , count(*)

UDTF : User defined table generating functions
    works on single row and outputs multiple rows

    explode()
    posexplode()
Works only on complex data types.

We can explode array, map

posexplode - gives the position of the element after explode.

select explode(subordinatelist) from table

to have the first column

select manager , explode(subordinatelist) from table will not work

virtual table we get from explode is called as lateral view. We should join the lateral view with the actual table to get all the columns.


--------------------------------------------------
--------------------------------------------------
Load table from the path and create custom UDF


create table if not exists table2(word string,count int) row format delimited
fields terminated by ',' lines terminated by '\n' stored as textfile;

load data local inpath '/home/cloudera/Downloads/rankfunctions.txt' overwrite into table table2;

rankfunctions.txt
John,1500
Albert,1500
Mark,1000
Frank,1150
Loopa,1100
Lui,1300
John,1300
John,900
Lesa,1500
Lesa,900
Pars,800
leo,700
leo,1500
lock,650
Bhut,800
Lio,500


[cloudera@quickstart Downloads]$ vi rankfunctions.txt
[cloudera@quickstart Downloads]$ pwd
/home/cloudera/Downloads
[cloudera@quickstart Downloads]$ ls /home/cloudera/Downloads/rankfunctions.txt
/home/cloudera/Downloads/rankfunctions.txt
[cloudera@quickstart Downloads]$



hive> [cloudera@quickstart Desktop]$
[cloudera@quickstart Desktop]$ hive

Logging initialized using configuration in file:/etc/hive/conf.dist/hive-log4j.properties
WARNING: Hive CLI is deprecated and migration to Beeline is recommended.
hive> show databases;
OK
basan
default
Time taken: 0.532 seconds, Fetched: 2 row(s)
hive> use basan
    > ;
OK
Time taken: 0.027 seconds
hive> create table if not exists table2(word string,count int) row format delimited
    > fields terminated by ',' lines terminated by '\n' stored as textfile;
OK
Time taken: 0.162 seconds
hive>

Time taken: 0.162 seconds
hive> load data local inpath '/home/cloudera/Downloads/rankfunctions.txt' overwrite into table table2;
Loading data to table basan.table2
Table basan.table2 stats: [numFiles=1, numRows=0, totalSize=153, rawDataSize=0]
OK
Time taken: 0.713 seconds
hive>

Time taken: 0.713 seconds
hive> select * from table2
    > ;
OK
John 1500
Albert 1500


Time taken: 0.266 seconds, Fetched: 16 row(s)
hive> select word from table2
    > ;
OK
John
Albert
Mark
Frank

Time taken: 0.09 seconds, Fetched: 16 row(s)
hive>


Now download the jar hive-exec-1.2.2.jar
https://mvnrepository.com/artifact/org.apache.hive/hive-exec/1.2.2


set up the project with below code

package udf_example;

import org.apache.hadoop.hive.ql.exec.UDF;

public class DataStandardization extends UDF {

public String evaluate(String input){

if(input==null)
{
return null;
}
return (input.toString().toUpperCase());

}

}



step 5: add the jar in hive
add jar /home/cloudera/Desktop/my_udf.jar;


Register the function with hive

step 6: create a function in hive
create temporary function f1 as 'hiveUDF.DataStandardization';
select f1(word) from table2;


hive> add jar /home/cloudera/Desktop/my_udf.jar;
Added [/home/cloudera/Desktop/my_udf.jar] to class path
Added resources: [/home/cloudera/Desktop/my_udf.jar]

hive> create temporary function f1 as 'hiveUDF.DataStandardization';
OK
Time taken: 0.007 seconds


hive> use basan
    > ;
OK
Time taken: 0.034 seconds
hive> select f1(word) from table2;
OK
JOHN
ALBERT
MARK
FRANK
LOOPA
LUI


------------------------------
Creating mountpoint in cloudera
------------------------------
Create the folder in mac/windows which you want to share
put a dummy file

In the cloudera settings , add the mount point.
This mount point can be mapped as below
mount -t vboxsf

[cloudera@quickstart ~]$ pwd
/home/cloudera
[cloudera@quickstart ~]$ cd Desktop/
[cloudera@quickstart Desktop]$ mkdir Mount-virtaul
[cloudera@quickstart Desktop]$ su
Password:
[root@quickstart Desktop]# mount -t vboxsf Mount-virtaul Mount-virtaul
[root@quickstart Desktop]#




















Thursday, November 21, 2019

spark shell structured streaming with spark 2.3.2

spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import scala.concurrent.duration._

val records = spark.
  readStream.
  format("kafka").
  option("subscribe", "basan-topic").
  option("kafka.security.protocol" , "SSL").
  option("kafka.bootstrap.servers", "kafka-basan.com:9093").
  option("kafka.ssl.truststore.location" , "/Users/basan/client.truststore.jks").
  option("kafka.ssl.truststore.password" ,  "changeit").
  option("kafka.ssl.keystore.location" , "/Users/basan/basan.com.jks").
  option("kafka.ssl.keystore.password" , "changeit")
  load




val q = records.
  writeStream.
  format("console").
  option("truncate", false).
  trigger(Trigger.ProcessingTime(50.seconds)).
  outputMode(OutputMode.Update).
  start

q.stop


Useful link
https://jaceklaskowski.gitbooks.io/spark-structured-streaming/spark-sql-streaming-KafkaSource.html