Powered By Blogger

Monday, August 10, 2020

Things to consider while buying Residential Plots

1. The site should be proper rectangle/square with the only exception there is an extension in the northeast side. Avoid buying sites with any other shapes.

2. Prefer to buy  40*60feet dimension, if not at least 30*50 feet, Any width higher than 30feet is good though it is difficult to get that kind of plots.

3. Buy only the sites which are having east-facing or north facing.

4. The site should have red soil. Some people might have just poured red soil in the plot for a couple of feet, make sure you dig it for a couple of feet and see.

5. Always try to get history/documents about the plot you are trying to buy and get it verified from a trustworthy/knowledgeable lawyer.

6. If allowed you can check whether you have underground water or not as well, the preferred direction is North-East corner of the plot.

7. It is better to interact directly with the owner while buying plots.

8. Check the map of the area to avoid cremation centers, garbage dump areas, etc.

9. Suggested buy land from the owner in the group, so that you will have the strength to fight any problems.

10. If someone is giving a good offer, if you book the site on the same day then that is the TRAP, always take time to think and do your research.

Tuesday, June 9, 2020

Spark Concepts:

Spark Concepts:

HashAggregate

Stage
Resource allocation dependents on number of messages/files
we would like to process.

Though even if you allocate 16 cores. if there is file 100KB
then only 1 task will run. Look at the launch time to see
how many threads/tasks are running in parallel.

Always look at Tasks - these are threads.



* is an optimization


Exchange:Shuffle is costly operation. Should avoid as much as possible.
HashPartitioning : matching records will be brought to same machine.
So that each individual can work independently.
Its costly as io is involved,


Smortmergejoin
rabgejoin.

We can givehints.

sparksession.conf.set



Monday, May 4, 2020

Filter data in spark using isin

scala> :paste
// Entering paste mode (ctrl-D to finish)

import java.sql.Date
import org.apache.spark.sql._

    import spark.implicits._
    val inputDF :DataFrame = Seq(
      ("50104044", "order1", "orderline11" , "2020-03", Date.valueOf("2020-03-17")),
      ("50104279", "order2", "orderline21","2020-03", Date.valueOf("2020-03-18")),
      ("50104279", "order22", "orderline221","2020-03", Date.valueOf("2020-03-19")),
      ("50102271", "order3", "orderline31","2020-02", Date.valueOf("2020-02-11")),
      ("50104279", "order4", "orderline41","2020-02", Date.valueOf("2020-02-11")),
      ("50104279", "order5", "orderline51","2020-01", Date.valueOf("2020-01-11"))

    ).toDF("key",
      "order_number",
      "order_line_key",
      "order_mon",
      "order_date")

      val items = List("order1", "order2", "order3")

      val filterdf = inputDF.filter($"order_number".isin(items:_*))
      filterdf.show(false)

// Exiting paste mode, now interpreting.

+--------+------------+--------------+---------+----------+
|key     |order_number|order_line_key|order_mon|order_date|
+--------+------------+--------------+---------+----------+
|50104044|order1      |orderline11   |2020-03  |2020-03-17|
|50104279|order2      |orderline21   |2020-03  |2020-03-18|
|50102271|order3      |orderline31   |2020-02  |2020-02-11|
+--------+------------+--------------+---------+----------+

import java.sql.Date
import org.apache.spark.sql._
import spark.implicits._
inputDF: org.apache.spark.sql.DataFrame = [key: string, order_number: string ... 3 more fields]
items: List[String] = List(order1, order2, order3)
filterdf: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [key: string, order_number: string ... 3 more fields]

scala>

Sunday, April 19, 2020

Fill Substitute null with values in spark Dataframe


import spark.implicits._
val sourceDF = Seq(
           ("2019-12-09", "11", 1, "L1", "I11", "2018"),
           ("2019-12-09", "11", 2, "L2", "I10", "2018"),
           ("2019-12-09", "11", 3, "L3", "I4", "2018"),
           ("2019-12-09", "11", 4, "L4", "I4", "2015"),
           ("2019-12-09", "11", 5, "L5", "I4", "2019"),
           ("2019-12-09", "11", 6, "L6", null, "2014"),
           ("2019-12-09",  null, 7, "L7", null, "2013")

         ).toDF("date", "hour", "order", "line", "item", "time")

sourceDF.show(false)
val filledDF = sourceDF.na.fill("-99999",Seq("hour","item"))
filledDF.show(false)



scala> sourceDF.show(false)
+----------+----+-----+----+----+----+
|date      |hour|order|line|item|time|
+----------+----+-----+----+----+----+
|2019-12-09|11  |1    |L1  |I11 |2018|
|2019-12-09|11  |2    |L2  |I10 |2018|
|2019-12-09|11  |3    |L3  |I4  |2018|
|2019-12-09|11  |4    |L4  |I4  |2015|
|2019-12-09|11  |5    |L5  |I4  |2019|
|2019-12-09|11  |6    |L6  |null|2014|
|2019-12-09|null|7    |L7  |null|2013|
+----------+----+-----+----+----+----+


scala> val filledDF = sourceDF.na.fill("-99999",Seq("hour","item"))
filledDF: org.apache.spark.sql.DataFrame = [date: string, hour: string ... 4 more fields]


scala> filledDF.show(false)
+----------+------+-----+----+------+----+
|date      |hour  |order|line|item  |time|
+----------+------+-----+----+------+----+
|2019-12-09|11    |1    |L1  |I11   |2018|
|2019-12-09|11    |2    |L2  |I10   |2018|
|2019-12-09|11    |3    |L3  |I4    |2018|
|2019-12-09|11    |4    |L4  |I4    |2015|
|2019-12-09|11    |5    |L5  |I4    |2019|
|2019-12-09|11    |6    |L6  |-99999|2014|
|2019-12-09|-99999|7    |L7  |-99999|2013|
+----------+------+-----+----+------+----+


scala>

Friday, April 17, 2020

External table access via spark partition drop

For the external table in hive if you are adding or dropping partitions and if you try to access the table it will give vertex failure issue.

MSCK will not fix it
MSCK REPAIR TABLE basan.shipment_ext;

To fix this drop the partitions.

show partitions basan.shipment_ext;
//dropping single partition
ALTER TABLE basan.shipment_ext DROP IF EXISTS PARTITION(order_date ='2019-07-15')
//dropping multiple partition
ALTER TABLE basan.shipment_ext DROP IF EXISTS PARTITION(order_date >'2018-07-15')

Saturday, April 11, 2020

Spark with mysql database(spark jdbc)

/install the mysql server 8.0.19 and create the schema basan and the table shipment

CREATE TABLE `shipment` (
  `order_number` text,
  `order_date` text,
  `ship_status` text
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;


  1.make sure jar version is mysql-connector-java-8.0.19.jar and spark version in 2.3.2
  2. copy mysql-connector-java-8.0.19.jar jar to the location /jars
  datanucleus-core-3.2.10.jar
  3. run spark-shell and runt he below commands to interact with jdbc
 
 val jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:mysql://localhost:3306/basan?useSSL=false")
  .option("driver", "com.mysql.jdbc.Driver")
  .option("dbtable", "shipment")
  .option("user", "root")
  .option("password", "root1234")
  .load()

import spark.implicits._

val someDF = Seq(
  ("ordernumberinsert1", "2020-02-05", "delivered"),
  ("ordernumberinsert2", "2020-02-05", "delivered"),
  ("ordernumberinsert3", "2020-02-05", "delivered")
).toDF("order_number", "order_date","ship_status")
someDF.write.mode("overwrite")
  .format("jdbc")
  .option("url", "jdbc:mysql://localhost:3306/basan?useSSL=false")
  .option("driver", "com.mysql.jdbc.Driver")
  .option("dbtable", "shipment")
  .option("user", "root")
  .option("password", "root1234")
  .save()

Friday, March 27, 2020

Filtering set of values from the bigger dataframe

Filtering set of values from the bigger dataframe

import spark.implicits._
    val localFileDF = spark.read.orc("/tmp/Analysis-10")
        localFileDF.filter(localFileDF("key") === "1065551413044" )



var followinglist=List("1065551413044")
val df2 = localFileDF.filter(col("key").isin(followinglist:_*))

Wednesday, March 18, 2020

back to basic- sql left join producing duplicate records


CREATE TABLE `shipment` (
  `shipmentnumber` varchar(45) DEFAULT NULL,
  `order_number` varchar(45) DEFAULT NULL,
  `order_line` varchar(45) DEFAULT NULL,
  `quantity` int DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;



CREATE TABLE `order` (
  `order_number` varchar(45) DEFAULT NULL,
  `order_line` varchar(45) DEFAULT NULL,
  `prder_type` varchar(45) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;



INSERT INTO `test`.`shipment`
(`shipmentnumber`,
`order_number`,
`order_line`,
`quantity`)
VALUES
("shipmentnumber1",
"order1",
"orderline11",
2
);

insert into test.order values("order2","orderline21","ordertype2");
insert into test.order values("order1","orderline11","ordertype1");
insert into test.order values("order2","orderline21","ordertype3");


SELECT * FROM test.`order`;


# order_number, order_line, prder_type
order2, orderline21, ordertype2
order1, orderline11, ordertype1
order2, orderline21, ordertype3


'

select * from  `test`.`shipment`\

# shipmentnumber, order_number, order_line, quantity
shipmentnumber1, order1, orderline11, 1
shipmentnumber1, order2, orderline21, 1




select * from `test`.`shipment`  as shpmnt
 left join  `test`.`order` as ord
 on
shpmnt.order_number=ord.order_number
AND
shpmnt.order_line=ord.order_line

# shipmentnumber, order_number, order_line, quantity, order_number, order_line, prder_type
'shipmentnumber1', 'order2', 'orderline21', '1', 'order2', 'orderline21', 'ordertype2'
'shipmentnumber1', 'order1', 'orderline11', '1', 'order1', 'orderline11', 'ordertype1'
'shipmentnumber1', 'order2', 'orderline21', '1', 'order2', 'orderline21', 'ordertype3'

Observe since right table is having more records we get records more than left table
============

left table also having duplicates

select * from  `test`.`shipment`

Have added duplicate record in the left table

# shipmentnumber, order_number, order_line, quantity
shipmentnumber1, order1, orderline11, 1
shipmentnumber1, order2, orderline21, 1
shipmentnumber1, order1, orderline11, 2


SELECT * FROM test.`order`;

# order_number, order_line, prder_type
order2, orderline21, ordertype2
order1, orderline11, ordertype1
order2, orderline21, ordertype3

# shipmentnumber, order_number, order_line, quantity, order_number, order_line, prder_type
shipmentnumber1, order1, orderline11, 1, order1, orderline11, ordertype1
shipmentnumber1, order1, orderline11, 2, order1, orderline11, ordertype1
shipmentnumber1, order2, orderline21, 1, order2, orderline21, ordertype2
shipmentnumber1, order2, orderline21, 1, order2, orderline21, ordertype3

all the left table records will be retained, and from the right table cross product is happening

Monday, February 24, 2020

Adding column in dataframe with user defined value if the column is null

Add another column by checking the previous column value


scala> val df1 = Seq(
     |            (
     |              "track1",
     |              "2019-11-18T16:11:28",
     |              "linekey1",
     |              "item1",
     |              "123"
     |            ),
     |            (
     |              "track2",
     |              "2019-11-18T16:11:28",
     |              "linekey2",
     |              "item2",
     |              null
     |            )).toDF(
     |            "number",
     |            "source_modified_timestamp",
     |            "line_key",
     |            "item_no",
     |            "tracking_number"
     |          )
df1: org.apache.spark.sql.DataFrame = [number: string, source_modified_timestamp: string ... 3 more fields]

scala> df1.show
+------+-------------------------+--------+-------+---------------+
|number|source_modified_timestamp|line_key|item_no|tracking_number|
+------+-------------------------+--------+-------+---------------+
|track1|      2019-11-18T16:11:28|linekey1|  item1|            123|
|track2|      2019-11-18T16:11:28|linekey2|  item2|           null|
+------+-------------------------+--------+-------+---------------+


scala>     df1.withColumn("tracking_number_rm", when($"tracking_number".isNull, lit("-999999")).otherwise(lit($"tracking_number"))).show
+------+-------------------------+--------+-------+---------------+------------------+
|number|source_modified_timestamp|line_key|item_no|tracking_number|tracking_number_rm|
+------+-------------------------+--------+-------+---------------+------------------+
|track1|      2019-11-18T16:11:28|linekey1|  item1|            123|               123|
|track2|      2019-11-18T16:11:28|linekey2|  item2|           null|           -999999|
+------+-------------------------+--------+-------+---------------+------------------+


scala>         df1.withColumn("tracking_number_rm", when($"tracking_number".isNull, "-1111").otherwise($"tracking_number")).show
+------+-------------------------+--------+-------+---------------+------------------+
|number|source_modified_timestamp|line_key|item_no|tracking_number|tracking_number_rm|
+------+-------------------------+--------+-------+---------------+------------------+
|track1|      2019-11-18T16:11:28|linekey1|  item1|            123|               123|
|track2|      2019-11-18T16:11:28|linekey2|  item2|           null|             -1111|
+------+-------------------------+--------+-------+---------------+------------------+

Saturday, February 22, 2020

Group by considers null as seperate row


val simpleData = Seq(("James","Sales","NY",90000,34,10000),
    ("Michael","Sales","NY",86000,56,20000),
    ("Robert","Sales","CA",81000,30,23000),
    ("Maria","Finance","CA",90000,24,23000),
    ("Raman","Finance","CA",99000,40,24000),
    ("Scott","Finance","NY",83000,36,19000),
    ("Jen","Finance","NY",79000,53,15000),
    ("Jeff","Marketing","CA",80000,25,18000),
    ("Kumar","Marketing","NY",91000,50,21000)
  )
  val df = simpleData.toDF("employee_name","department","state","salary","age","bonus")
  df.show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
+-------------+----------+-----+------+---+-----+



    //using agg function
  df.groupBy("department")
    .agg(
      sum("salary").as("sum_salary"),
      avg("salary").as("avg_salary"),
      sum("bonus").as("sum_bonus"),
      max("bonus").as("max_bonus"))
    .show(false)

+----------+----------+-----------------+---------+---------+
|department|sum_salary|avg_salary       |sum_bonus|max_bonus|
+----------+----------+-----------------+---------+---------+
|Sales     |257000    |85666.66666666667|53000    |23000    |
|Finance   |351000    |87750.0          |81000    |24000    |
|Marketing |171000    |85500.0          |39000    |21000    |
+----------+----------+-----------------+---------+---------+


=====

val simpleData = Seq(("James","Sales","NY",90000,34,10000),
    ("Michael","Sales","NY",86000,56,20000),
    ("Robert","Sales","CA",81000,30,23000),
    ("Maria","Finance","CA",90000,24,23000),
    ("Raman","Finance","CA",99000,40,24000),
    ("Scott","Finance","NY",83000,36,19000),
    ("Jen","Finance","NY",79000,53,15000),
    ("Jeff","Marketing","CA",80000,25,18000),
    ("Kumar",null,"NY",91000,50,21000)
  )
  val df = simpleData.toDF("employee_name","department","state","salary","age","bonus")
  df.show()


  +-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Kumar|      null|   NY| 91000| 50|21000|
+-------------+----------+-----+------+---+-----+

Observe null is treated as seperate row

+----------+----------+-----------------+---------+---------+
|department|sum_salary|avg_salary       |sum_bonus|max_bonus|
+----------+----------+-----------------+---------+---------+
|Sales     |257000    |85666.66666666667|53000    |23000    |
|null      |91000     |91000.0          |21000    |21000    |
|Finance   |351000    |87750.0          |81000    |24000    |
|Marketing |80000     |80000.0          |18000    |18000    |
+----------+----------+-----------------+---------+---------+

Thursday, February 20, 2020

Loading Hive config in spark shell

-bash-4.2$ spark-shell --conf spark.sql.sources.partitionOverwriteMode=dynamic \
>        --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict \
>        --conf spark.sql.orc.impl=native \
>        --conf spark.sql.orc.enableVectorizedReader=true \
>        --conf spark.hadoop.hive.exec.max.dynamic.partitions=2550



Observe spark.hadoop.* is placed for all the hadoop configurations


If we pass  only --conf hive.exec.max.dynamic.partitions=2550 then this property will be ignored by spark-shell in 2.3.1 version


We can also pass memory config as below
spark-shell --driver-memory 10G --executor-memory 15G --executor-cores 8

Sunday, February 9, 2020

Loading local jar in spark-shell

Loading jar from local
-bash-4.2$ spark-shell --jars /home_dir/basan/dpp/pipelinecore_2.11-1.2.1.jar


-bash-4.2$ spark-shell --jars


Loading jar from internet for spark shell


-bash-4.2$ spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2 

Loading jar from internet for spark submit


spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2 --queue basanqueue --driver-memory 2G --num-executors 8 --executor-memory 14G --executor-cores 6 --class com.basan.CoreTableProcessor  dpp-assembly-1.0.jar application.conf 1 2

Saturday, January 11, 2020

Spark Learnings

Learning of spark
1. drop column has to be assigned to variable
    val validRecordsDroppedStatusColumnsDF = validRecordsDF.drop("value_vald_status")
      .drop("cat_value_vald_status")
   
2. columnsAsPerTableValidPartitionRecordsDF.write.format("orc").mode("overwrite")
          //.partitionBy("date_val","hour_val")
          .insertInto("basan.employee")

3. Setting variables of spark is different from setting conf of spark properties
      hive.exec.max.dynamic.partitions=2000

4.Creating the session which avoids deleting any of the partitions and interacts with Hive
 def getSparkSession(appName:String): SparkSession = {
    val wareHouseLocation = "/apps/hive/warehouse"
    val sparkSession = SparkSession.getActiveSession.getOrElse(SparkSession.builder
      .appName(appName)
      .config("spark.sql.sources.partitionOverwriteMode" , "dynamic")
      .config("hive.exec.dynamic.partition.mode", "nonstrict" )
      .config("spark.sql.orc.impl","native")
      .config("spark.sql.orc.enableVectorizedReader","true")
      .config("spark.sql.warehouse.dir", wareHouseLocation)
      .enableHiveSupport()
      .getOrCreate())
    logInfo("loaded sparkSession")
    sparkSession
  }


  5. Writing to table, observe we are passing orc
        dataframe.write.format("orc"").mode(SaveMode.Overwrite).insertInto("basan.tableName1")

6. loading table info , always assign the name
    val dataFrame: DataFrame = sparkSession.sql("SELECT * from basan.tableName1")
7. selecting columns

val VALID_RECORDS_WITH_PARTITION_FIELDS = Seq(
    "kafka_key",
    "partition",
    "offset",
    "timestamp",
    "topic",
    "load_date")
val failuresColumns = VALID_RECORDS_WITH_PARTITION_FIELDS.map(col)
    val FailuresReadDF = dataframe.select(failuresColumns:_*)



Wednesday, January 8, 2020

Spark Session creation to read the ORC files in Spark 2.3.2

val sparkSession: SparkSession = SparkSession.builder.appName("session creation basan")
  .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
  .config("hive.exec.dynamic.partition.mode", "nonstrict")
  .config("spark.sql.orc.impl", "native")
  .config("spark.sql.orc.enableVectorizedReader", "true").master("local[*]").getOrCreate()

Saturday, January 4, 2020

Map Inversion

val m = Map("1" -> List("a","b","c")
           ,"2" -> List("a","j","k")
           ,"3" -> List("a","c"))

m flatten {case(k, vs) => vs.map((_, k))} groupBy (_._1) mapValues {_.map(_._2)}
//res0: Map[String,Iterable[String]] = Map(j -> List(2), a -> List(1, 2, 3), b -> List(1), c -> List(1, 3), k -> List(2))
Flatten the Map into a collection of tuples. groupBy will create a new Map with the old values as the new keys. Then un-tuple the values by removing the key (previously value) elements.



Map of type Map[A, Seq[B]] to Map[B, Seq[A]], where each B in the new map is associated with every A in the old map for which the B was contained in A’s associated sequence.