Powered By Blogger

Saturday, November 30, 2019

Dataframe simple operations


package com.basan.day5.df

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.log4j._
import org.apache.spark.sql.Column

object LoggingWithCaseClass {

  case class Logging(level: String, datetime: String)

  def mapper(line: String): Logging = {
    val fields = line.split(",")
    val logging: Logging = Logging(fields(0), fields(1))
    logging
  }

  /** Our main function where the action happens */
  def main(args: Array[String]) {

    // Set the log level to only print errors
    Logger.getLogger("org").setLevel(Level.ERROR)

    // Use new SparkSession interface in Spark 2.0
    // Use new SparkSession interface in Spark 2.0
    val spark = SparkSession
      .builder
      .appName("SparkSQL")
      .master("local[*]")
      .config("spark.sql.warehouse.dir", "file:///C:/temp") // Necessary to work around a Windows bug in Spark 2.0.0; omit if you're not on Windows.
      .getOrCreate()

    import spark.implicits._

    var mylist = List(
      "WARN,2016-12-31 04:19:32",
      "FATAL,2016-12-31 03:19:32",
      "WARN,2016-12-31 04:19:32",
      "WARN,2016-12-31 04:19:31",
      "INFO,2016-12-31 04:19:32",
      "FATAL,2016-12-31 14:19:32")

    val rdd1 = spark.sparkContext.parallelize(mylist)
    val rdd2 = rdd1.map(mapper)

    val df1 = rdd2.toDF()
    df1.show()
    df1.createOrReplaceTempView("logging_table")

    spark.sql("Select * from logging_table ").show()
    //show(false) will display all the results
    spark.sql("Select level , collect_list(datetime) from logging_table group by level order by level").show(false)

    spark.sql("Select level , count(datetime) from logging_table group by level order by level").show(false)
    
    
    spark.sql("select level, datetime from logging_table").show() 
    
    
    

  }

}






Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
+-----+-------------------+
|level|           datetime|
+-----+-------------------+
| WARN|2016-12-31 04:19:32|
|FATAL|2016-12-31 03:19:32|
| WARN|2016-12-31 04:19:32|
| WARN|2016-12-31 04:19:31|
| INFO|2016-12-31 04:19:32|
|FATAL|2016-12-31 14:19:32|
+-----+-------------------+

+-----+-------------------+
|level|           datetime|
+-----+-------------------+
| WARN|2016-12-31 04:19:32|
|FATAL|2016-12-31 03:19:32|
| WARN|2016-12-31 04:19:32|
| WARN|2016-12-31 04:19:31|
| INFO|2016-12-31 04:19:32|
|FATAL|2016-12-31 14:19:32|
+-----+-------------------+

+-----+---------------------------------------------------------------+
|level|collect_list(datetime)                                         |
+-----+---------------------------------------------------------------+
|FATAL|[2016-12-31 03:19:32, 2016-12-31 14:19:32]                     |
|INFO |[2016-12-31 04:19:32]                                          |
|WARN |[2016-12-31 04:19:32, 2016-12-31 04:19:32, 2016-12-31 04:19:31]|
+-----+---------------------------------------------------------------+

+-----+---------------+
|level|count(datetime)|
+-----+---------------+
|FATAL|2              |
|INFO |1              |
|WARN |3              |
+-----+---------------+

+-----+-------------------+
|level|           datetime|
+-----+-------------------+
| WARN|2016-12-31 04:19:32|
|FATAL|2016-12-31 03:19:32|
| WARN|2016-12-31 04:19:32|
| WARN|2016-12-31 04:19:31|
| INFO|2016-12-31 04:19:32|
|FATAL|2016-12-31 14:19:32|
+-----+-------------------+


Friday, November 29, 2019

Sqoop basics

sqoop

When we connect to hadoop, we connect to edge node and that internally
takes care of executing stuff in the cluster.

connecting to mysql from hadoop

sqoop-list-databases \
--connect "jdbc:mysql://quickstart.cloudera:3306" \
--username retail_dba \
--password cloudera

[cloudera@quickstart ~]$ hdfs dfs -ls
Found 1 items
drwxr-xr-x   - cloudera cloudera          0 2019-11-03 03:58 _sqoop
[cloudera@quickstart ~]$ sqoop-list-databases \
> --connect "jdbc:mysql://quickstart.cloudera:3306" \
> --username retail_dba \
> --password cloudera
Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
19/11/29 21:49:45 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.13.0
19/11/29 21:49:45 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
19/11/29 21:49:45 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
information_schema
retail_db
[cloudera@quickstart ~]$



now try to see the tables using the root user to see all tables listed.


sqoop-list-tables \
--connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
--username retail_dba \
--password cloudera


[cloudera@quickstart ~]$ sqoop-list-tables \
> --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
> --username retail_dba \
> --password cloudera
Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
19/11/29 21:56:50 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.13.0
19/11/29 21:56:50 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
19/11/29 21:56:50 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
categories
customers
departments
order_items
orders
products
[cloudera@quickstart ~]$




sqoop-eval \
--connect "jdbc:mysql://quickstart.cloudera:3306" \
--username retail_dba \
--password cloudera \
--query "select * from retail_db.customers limit 10"

https://sqoop.apache.org/docs/1.4.4/SqoopUserGuide.html#_controlling_the_import_process

sqoop import \
-Dgapreduce.job.queuename=${job_queue_nm} \
-Dmapreduce.job.queuename=${job_queue_nm} \
-Dmapred.job.queuename=${job_queue_nm} \
--options-file ${goa_lkup_dir}/connect.txt \
--delete-target-dir \
--target-dir ${landingdir}/order_node_allocation_reason_e \
--hive-drop-import-delims \
--query "select nearest_node_json from order_node_allocation_reason \
where insert_time >= '$strt_ts'  and  \$CONDITIONS" \
--null-string '\\N' --null-non-string '\\N' \
--num-mappers ${nummapper} \
--fields-terminated-by '|' \
--lines-terminated-by '\n'



create a table in sql and insert some data
CREATE TABLE people (PersonID int,LastName varchar(255),FirstName varchar(255),Address varchar(255),City varchar(255));

insert into people values(1,'patil', 'basan', 'address','bangalore');
insert into people values(2,'patil2', 'basan2', 'address2','bangalore2');
insert into people values(3,'patil3', 'basan3', 'address3','bangalore3');

commit;

mysql> insert into people values(1,'patil', 'basan', 'address','bangalore');
Query OK, 1 row affected (0.01 sec)

mysql> commit
    -> ;
Query OK, 0 rows affected (0.00 sec)

mysql>


Data can be sqooped without primary key but we will get only one mapper
.
If the output of the sqoop already exists then it will throw error


sqoop import \
--connect jdbc:mysql://quickstart.cloudera:3306/retail_db \
--username root \
--password cloudera \
--table people \
--target-dir /queryresult


[cloudera@quickstart ~]$ sqoop import \
> --connect jdbc:mysql://quickstart.cloudera:3306/retail_db \
> --username root \
> --password cloudera \
> --table people \
> --target-dir /queryresult
Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
19/11/29 22:15:26 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.13.0
19/11/29 22:15:26 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
19/11/29 22:15:26 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
19/11/29 22:15:26 INFO tool.CodeGenTool: Beginning code generation
19/11/29 22:15:26 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `people` AS t LIMIT 1
19/11/29 22:15:26 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `people` AS t LIMIT 1
19/11/29 22:15:26 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/lib/hadoop-mapreduce
Note: /tmp/sqoop-cloudera/compile/22a1f58b27b214b8bc8a6b8727c10967/people.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
19/11/29 22:15:28 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-cloudera/compile/22a1f58b27b214b8bc8a6b8727c10967/people.jar
19/11/29 22:15:28 WARN manager.MySQLManager: It looks like you are importing from mysql.
19/11/29 22:15:28 WARN manager.MySQLManager: This transfer can be faster! Use the --direct
19/11/29 22:15:28 WARN manager.MySQLManager: option to exercise a MySQL-specific fast path.
19/11/29 22:15:28 INFO manager.MySQLManager: Setting zero DATETIME behavior to convertToNull (mysql)
19/11/29 22:15:28 ERROR tool.ImportTool: Import failed: No primary key could be found for table people. Please specify one with --split-by or perform a sequential import with '-m 1'.
[cloudera@quickstart ~]$


hdfs dfs -ls /queryresult


To fix this pass -m =1
sqoop import \
--connect jdbc:mysql://quickstart.cloudera:3306/retail_db \
--username root \
--password cloudera \
--table people \
-m 1 \
--target-dir /queryresult



[cloudera@quickstart ~]$ sqoop import \
> --connect jdbc:mysql://quickstart.cloudera:3306/retail_db \
> --username root \
> --password cloudera \
> --table people \
> -m 1 \
> --target-dir /queryresult
Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
19/11/29 22:17:58 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.13.0
19/11/29 22:17:58 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
19/11/29 22:17:58 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
19/11/29 22:17:58 INFO tool.CodeGenTool: Beginning code generation
19/11/29 22:17:59 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `people` AS t LIMIT 1
19/11/29 22:17:59 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `people` AS t LIMIT 1
19/11/29 22:17:59 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/lib/hadoop-mapreduce
Note: /tmp/sqoop-cloudera/compile/aea02172d63d47af0d50fec188aa6c21/people.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
19/11/29 22:18:00 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-cloudera/compile/aea02172d63d47af0d50fec188aa6c21/people.jar
19/11/29 22:18:00 WARN manager.MySQLManager: It looks like you are importing from mysql.
19/11/29 22:18:00 WARN manager.MySQLManager: This transfer can be faster! Use the --direct
19/11/29 22:18:00 WARN manager.MySQLManager: option to exercise a MySQL-specific fast path.
19/11/29 22:18:00 INFO manager.MySQLManager: Setting zero DATETIME behavior to convertToNull (mysql)
19/11/29 22:18:00 INFO mapreduce.ImportJobBase: Beginning import of people
19/11/29 22:18:00 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
19/11/29 22:18:00 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
19/11/29 22:18:01 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
19/11/29 22:18:01 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/11/29 22:18:03 INFO db.DBInputFormat: Using read commited transaction isolation
19/11/29 22:18:03 INFO mapreduce.JobSubmitter: number of splits:1
19/11/29 22:18:03 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1572771724749_0035
19/11/29 22:18:04 INFO impl.YarnClientImpl: Submitted application application_1572771724749_0035
19/11/29 22:18:04 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1572771724749_0035/
19/11/29 22:18:04 INFO mapreduce.Job: Running job: job_1572771724749_0035
19/11/29 22:18:10 INFO mapreduce.Job: Job job_1572771724749_0035 running in uber mode : false
19/11/29 22:18:10 INFO mapreduce.Job:  map 0% reduce 0%
19/11/29 22:18:15 INFO mapreduce.Job:  map 100% reduce 0%
19/11/29 22:18:15 INFO mapreduce.Job: Job job_1572771724749_0035 completed successfully
19/11/29 22:18:15 INFO mapreduce.Job: Counters: 30
File System Counters
FILE: Number of bytes read=0
FILE: Number of bytes written=171167
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=87
HDFS: Number of bytes written=104
HDFS: Number of read operations=4
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Other local map tasks=1
Total time spent by all maps in occupied slots (ms)=3008
Total time spent by all reduces in occupied slots (ms)=0
Total time spent by all map tasks (ms)=3008
Total vcore-milliseconds taken by all map tasks=3008
Total megabyte-milliseconds taken by all map tasks=3080192
Map-Reduce Framework
Map input records=3
Map output records=3
Input split bytes=87
Spilled Records=0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=32
CPU time spent (ms)=850
Physical memory (bytes) snapshot=215085056
Virtual memory (bytes) snapshot=1573986304
Total committed heap usage (bytes)=235929600
File Input Format Counters
Bytes Read=0
File Output Format Counters
Bytes Written=104
19/11/29 22:18:15 INFO mapreduce.ImportJobBase: Transferred 104 bytes in 14.0751 seconds (7.3889 bytes/sec)
19/11/29 22:18:15 INFO mapreduce.ImportJobBase: Retrieved 3 records.
[cloudera@quickstart ~]$ hdfs dfs -ls /queryresultFound 2 items
-rw-r--r--   1 cloudera supergroup          0 2019-11-29 22:18 /queryresult/_SUCCESS
-rw-r--r--   1 cloudera supergroup        104 2019-11-29 22:18 /queryresult/part-m-00000
[cloudera@quickstart ~]$

By default 4 mappers will be used. But if the table is not having key
then 1 mapper will be used. SO we see one file as the output.


sqoop import \
--connect jdbc:mysql://quickstart.cloudera:3306/retail_db \
--username root \
--password cloudera \
--table orders \
--target-dir /queryresultorders

hdfs dfs -ls /queryresultorders
hdfs dfs -cat /queryresultorders/*



[cloudera@quickstart ~]$ sqoop import \
> --connect jdbc:mysql://quickstart.cloudera:3306/retail_db \
> --username root \
> --password cloudera \
> --table orders \
> --target-dir /queryresultorders
Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
19/11/29 22:21:41 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.13.0
19/11/29 22:21:41 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
19/11/29 22:21:41 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
19/11/29 22:21:41 INFO tool.CodeGenTool: Beginning code generation
19/11/29 22:21:41 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `orders` AS t LIMIT 1
19/11/29 22:21:41 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `orders` AS t LIMIT 1
19/11/29 22:21:41 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/lib/hadoop-mapreduce
Note: /tmp/sqoop-cloudera/compile/4038ef3cab402a8d089b3d928036f385/orders.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
19/11/29 22:21:43 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-cloudera/compile/4038ef3cab402a8d089b3d928036f385/orders.jar
19/11/29 22:21:43 WARN manager.MySQLManager: It looks like you are importing from mysql.
19/11/29 22:21:43 WARN manager.MySQLManager: This transfer can be faster! Use the --direct
19/11/29 22:21:43 WARN manager.MySQLManager: option to exercise a MySQL-specific fast path.
19/11/29 22:21:43 INFO manager.MySQLManager: Setting zero DATETIME behavior to convertToNull (mysql)
19/11/29 22:21:43 INFO mapreduce.ImportJobBase: Beginning import of orders
19/11/29 22:21:43 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
19/11/29 22:21:43 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
19/11/29 22:21:43 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
19/11/29 22:21:44 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/11/29 22:21:45 INFO db.DBInputFormat: Using read commited transaction isolation
19/11/29 22:21:45 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN(`order_id`), MAX(`order_id`) FROM `orders`
19/11/29 22:21:45 INFO db.IntegerSplitter: Split size: 17220; Num splits: 4 from: 1 to: 68883
19/11/29 22:21:45 INFO mapreduce.JobSubmitter: number of splits:4
19/11/29 22:21:45 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1572771724749_0036
19/11/29 22:21:45 INFO impl.YarnClientImpl: Submitted application application_1572771724749_0036
19/11/29 22:21:46 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1572771724749_0036/
19/11/29 22:21:46 INFO mapreduce.Job: Running job: job_1572771724749_0036
19/11/29 22:21:52 INFO mapreduce.Job: Job job_1572771724749_0036 running in uber mode : false
19/11/29 22:21:52 INFO mapreduce.Job:  map 0% reduce 0%
19/11/29 22:22:02 INFO mapreduce.Job:  map 25% reduce 0%
19/11/29 22:22:05 INFO mapreduce.Job:  map 50% reduce 0%
19/11/29 22:22:06 INFO mapreduce.Job:  map 75% reduce 0%
19/11/29 22:22:07 INFO mapreduce.Job:  map 100% reduce 0%
19/11/29 22:22:07 INFO mapreduce.Job: Job job_1572771724749_0036 completed successfully
19/11/29 22:22:07 INFO mapreduce.Job: Counters: 30
File System Counters
FILE: Number of bytes read=0
FILE: Number of bytes written=685420
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=469
HDFS: Number of bytes written=2999944
HDFS: Number of read operations=16
HDFS: Number of large read operations=0
HDFS: Number of write operations=8
Job Counters
Launched map tasks=4
Other local map tasks=4
Total time spent by all maps in occupied slots (ms)=36317
Total time spent by all reduces in occupied slots (ms)=0
Total time spent by all map tasks (ms)=36317
Total vcore-milliseconds taken by all map tasks=36317
Total megabyte-milliseconds taken by all map tasks=37188608
Map-Reduce Framework
Map input records=68883
Map output records=68883
Input split bytes=469
Spilled Records=0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=335
CPU time spent (ms)=13480
Physical memory (bytes) snapshot=1053622272
Virtual memory (bytes) snapshot=6251585536
Total committed heap usage (bytes)=945291264
File Input Format Counters
Bytes Read=0
File Output Format Counters
Bytes Written=2999944
19/11/29 22:22:07 INFO mapreduce.ImportJobBase: Transferred 2.861 MB in 23.5627 seconds (124.3332 KB/sec)
19/11/29 22:22:07 INFO mapreduce.ImportJobBase: Retrieved 68883 records.
[cloudera@quickstart ~]$ hdfs dfs -ls /queryresultorders
Found 5 items
-rw-r--r--   1 cloudera supergroup          0 2019-11-29 22:22 /queryresultorders/_SUCCESS
-rw-r--r--   1 cloudera supergroup     741614 2019-11-29 22:22 /queryresultorders/part-m-00000
-rw-r--r--   1 cloudera supergroup     753022 2019-11-29 22:22 /queryresultorders/part-m-00001
-rw-r--r--   1 cloudera supergroup     752368 2019-11-29 22:22 /queryresultorders/part-m-00002
-rw-r--r--   1 cloudera supergroup     752940 2019-11-29 22:22 /queryresultorders/part-m-00003
[cloudera@quickstart ~]$

Observer by default 4 mappers got created.



hdfs dfs -cat /queryresultorders/*

If the table is huge and having no key single mapper will not work. We need to optimize.


Bringing all of the tables

sqoop-import-all-tables \
--connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
--username retail_dba \
--password cloudera \
--as-sequencefile \
-m 4 \
--warehouse-dir /user/cloudera/sqoopdir

We can mention file formats while sqooping data.
 By default it uses text file formats
 sequence file format is supported.
 avro file format
 parquet file format

 Orc is not supported.


 warehouse-dir and target-dir?
 warehouse-dir : With the name of the table sub directory will be created. When we are importing multiple tables
 we have to use warehouse-dir, target-dir cannot be used.




 target-dir :



sqoop import \
--connect jdbc:mysql://quickstart.cloudera:3306/retail_db \
--username root \
--password cloudera \
--table orders \
--warehouse-dir /orderswithwarehouse


hdfs dfs -ls /orderswithwarehouse


[cloudera@quickstart ~]$ sqoop import \
> --connect jdbc:mysql://quickstart.cloudera:3306/retail_db \
> --username root \
> --password cloudera \
> --table orders \
> --warehouse-dir /orderswithwarehouse
Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
19/11/29 22:34:23 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.13.0
19/11/29 22:34:23 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
19/11/29 22:34:24 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
19/11/29 22:34:24 INFO tool.CodeGenTool: Beginning code generation
19/11/29 22:34:24 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `orders` AS t LIMIT 1
19/11/29 22:34:24 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `orders` AS t LIMIT 1
19/11/29 22:34:24 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/lib/hadoop-mapreduce
Note: /tmp/sqoop-cloudera/compile/2c43aaa902ea582daec0aa0cd99368a7/orders.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
19/11/29 22:34:25 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-cloudera/compile/2c43aaa902ea582daec0aa0cd99368a7/orders.jar
19/11/29 22:34:25 WARN manager.MySQLManager: It looks like you are importing from mysql.
19/11/29 22:34:25 WARN manager.MySQLManager: This transfer can be faster! Use the --direct
19/11/29 22:34:25 WARN manager.MySQLManager: option to exercise a MySQL-specific fast path.
19/11/29 22:34:25 INFO manager.MySQLManager: Setting zero DATETIME behavior to convertToNull (mysql)
19/11/29 22:34:25 INFO mapreduce.ImportJobBase: Beginning import of orders
19/11/29 22:34:25 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
19/11/29 22:34:26 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
19/11/29 22:34:26 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
19/11/29 22:34:27 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/11/29 22:34:28 INFO db.DBInputFormat: Using read commited transaction isolation
19/11/29 22:34:28 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN(`order_id`), MAX(`order_id`) FROM `orders`
19/11/29 22:34:28 INFO db.IntegerSplitter: Split size: 17220; Num splits: 4 from: 1 to: 68883
19/11/29 22:34:28 INFO mapreduce.JobSubmitter: number of splits:4
19/11/29 22:34:28 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1572771724749_0037
19/11/29 22:34:28 INFO impl.YarnClientImpl: Submitted application application_1572771724749_0037
19/11/29 22:34:28 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1572771724749_0037/
19/11/29 22:34:28 INFO mapreduce.Job: Running job: job_1572771724749_0037
19/11/29 22:34:36 INFO mapreduce.Job: Job job_1572771724749_0037 running in uber mode : false
19/11/29 22:34:36 INFO mapreduce.Job:  map 0% reduce 0%
19/11/29 22:34:47 INFO mapreduce.Job:  map 25% reduce 0%
19/11/29 22:34:50 INFO mapreduce.Job:  map 50% reduce 0%
19/11/29 22:34:51 INFO mapreduce.Job:  map 100% reduce 0%
19/11/29 22:34:52 INFO mapreduce.Job: Job job_1572771724749_0037 completed successfully
19/11/29 22:34:52 INFO mapreduce.Job: Counters: 31
File System Counters
FILE: Number of bytes read=0
FILE: Number of bytes written=685476
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=469
HDFS: Number of bytes written=2999944
HDFS: Number of read operations=16
HDFS: Number of large read operations=0
HDFS: Number of write operations=8
Job Counters
Killed map tasks=1
Launched map tasks=4
Other local map tasks=4
Total time spent by all maps in occupied slots (ms)=38440
Total time spent by all reduces in occupied slots (ms)=0
Total time spent by all map tasks (ms)=38440
Total vcore-milliseconds taken by all map tasks=38440
Total megabyte-milliseconds taken by all map tasks=39362560
Map-Reduce Framework
Map input records=68883
Map output records=68883
Input split bytes=469
Spilled Records=0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=424
CPU time spent (ms)=13190
Physical memory (bytes) snapshot=989405184
Virtual memory (bytes) snapshot=6315798528
Total committed heap usage (bytes)=946339840
File Input Format Counters
Bytes Read=0
File Output Format Counters
Bytes Written=2999944
19/11/29 22:34:52 INFO mapreduce.ImportJobBase: Transferred 2.861 MB in 25.5098 seconds (114.8435 KB/sec)
19/11/29 22:34:52 INFO mapreduce.ImportJobBase: Retrieved 68883 records.
[cloudera@quickstart ~]$ hdfs dfs -ls /orderswithwarehouse
Found 1 items
drwxr-xr-x   - cloudera supergroup          0 2019-11-29 22:34 /orderswithwarehouse/orders
[cloudera@quickstart ~]$ hdfs dfs -ls /orderswithwarehouse/orders
Found 5 items
-rw-r--r--   1 cloudera supergroup          0 2019-11-29 22:34 /orderswithwarehouse/orders/_SUCCESS
-rw-r--r--   1 cloudera supergroup     741614 2019-11-29 22:34 /orderswithwarehouse/orders/part-m-00000
-rw-r--r--   1 cloudera supergroup     753022 2019-11-29 22:34 /orderswithwarehouse/orders/part-m-00001
-rw-r--r--   1 cloudera supergroup     752368 2019-11-29 22:34 /orderswithwarehouse/orders/part-m-00002
-rw-r--r--   1 cloudera supergroup     752940 2019-11-29 22:34 /orderswithwarehouse/orders/part-m-00003
[cloudera@quickstart ~]$



Observe [cloudera@quickstart ~]$ hdfs dfs -ls /orderswithwarehouse/orders sub folder been created for the config warehouse-dir

==============================================================================================
#getting help of all

sqoop help
sqoop version

[cloudera@quickstart ~]$ sqoop version
Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
19/11/29 22:42:21 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.13.0
Sqoop 1.4.6-cdh5.13.0
git commit id
Compiled by jenkins on Wed Oct  4 11:04:44 PDT 2017
[cloudera@quickstart ~]$


sqoop help eval

#getting help of import
sqoop help import

==============================================================================================

--password we need to enter password in plain text

-P it will prompt for password


sqoop-list-databases \
--connect "jdbc:mysql://quickstart.cloudera:3306" \
--username retail_dba \
-P


[cloudera@quickstart ~]$ sqoop-list-databases \
> --connect "jdbc:mysql://quickstart.cloudera:3306" \
> --username retail_dba \
> -P
Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
19/11/29 22:43:51 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.13.0
Enter password:
19/11/29 22:43:55 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
information_schema
retail_db
[cloudera@quickstart ~]$

Observer it expects Enter password: to be entered

==============================================================================================


sqoop eval command -e --query

-e or --query are same
-m or -nummapper are same

==============================================================================================

redirecting logs - stdout stderr

1 is the code for stdout

2 is the code for stderr

1>query.out 2>query.err

When we automate and see the logs this is needed

sqoop import \
--connect jdbc:mysql://quickstart.cloudera:3306/retail_db \
--username root \
--password cloudera \
--table orders \
--target-dir /queryresulttargetdir-err 1>query.out 2>query.err



[cloudera@quickstart ~]$ sqoop import \
> --connect jdbc:mysql://quickstart.cloudera:3306/retail_db \
> --username root \
> --password cloudera \
> --table orders \
> --target-dir /queryresulttargetdir-err 1>query.out 2>query.err
;
[cloudera@quickstart ~]$ ;
bash: syntax error near unexpected token `;'
[cloudera@quickstart ~]$ ls
cloudera-manager  Desktop    Downloads  enterprise-deployment.json  kerberos  Music        parcels      Pictures  query.err  Templates  workspace
cm_api.py         Documents  eclipse    express-deployment.json     lib       orders.java  people.java  Public    query.out  Videos
[cloudera@quickstart ~]$ vi query.out
[cloudera@quickstart ~]$ vi query.err
[cloudera@quickstart ~]$ ls query.out
query.out
[cloudera@quickstart ~]$ ls query.err
query.err
[cloudera@quickstart ~]$


Observer the file query.out  and query.err  created

==============================================================================================
sqoop import

target-dir

warehouse-dir

--append

--delete-target-dir

==============================================================================================

If there is no primary key we will see how it can be optimized.
==============================================================================================

What is bounding val query?

When there is no primary key we have to specify mapper as 1 or specify split-by

Use sqoop with the column which is of integral type.


==============================================================================================

file formats

--as-avrodatafile
--as-sequencefile
--as-parquetfile
==============================================================================================


by default --compression is gz compression.

If we specify the specific compression
specify --compression-codec  can pass SnappyCodec

-compress or -z (default compression is gzip and we will see .gz extension)

--compression-codec (specify the compression algorithm)

/etc/hadoop/conf core-site.xml

--compress
--compression-codec org.apache.hadoop.io.compress.SnappyCodec

==============================================================================================
















Sunday, November 24, 2019

Dataframe code with take and show methods


package com.basan.day4.df

import org.apache.log4j._
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.SparkSession

/**
 * 0 Will 33 385
 * 1 Jean-Luc 26 2
 * 2 Hugh 55 221
 * 3 Deanna 40 465
 * 4 Quark 68 21
 * 5 Weyoun 59 318
 * 6 Gowron 37 220
 * 7 Will 54 307*
 */

object DataFrameExampleSQL {

  case class Person(ID: Int, name: String, age: Int, numFriends: Int)

  def mapper(line: String): Person = {
    val fields = line.split(",")
    val person: Person = Person(fields(0).toInt, fields(1), fields(2).toInt, fields(3).toInt)
    person
  }

  def main(args: Array[String]) {

    Logger.getLogger("org").setLevel(Level.ERROR)

    val spark = SparkSession.builder
      .appName("Data frame test using SQL")
      .master("local[*]")
      //needed only in windows check it
      // .config("spark.sql.warehouse.dir", "/Users/basan/Documents/Spark/tempwarehouse")
      // .config("spark.driver.allowMultipleContexts", "true")
      .getOrCreate()

    // val sc = new SparkContext("local[*]", "ERROR count with bigFile")

    var lines = spark.sparkContext.textFile("/Users/basan/workspace-spark/sparkDemo1/spark-data/friends-data.csv")
    import spark.implicits._
    val schemaPeople = lines.map(mapper).toDS()

    schemaPeople.createOrReplaceTempView("people")

    //SQL on the dataframe

    val teenagers = spark.sql("SELECT * from people where age>=13 AND age<=19")
        print("show-----")

    teenagers.show(2)
    print("take-----")
    teenagers.take(2)

    val results = teenagers.collect()

    results.foreach(println)

    scala.io.StdIn.readLine()

    spark.stop()

  }
 
 
  //look into the spark UI and SQL tab observe WholeStageCodegen block is generated
  //Check the plan to see how queries gets executed and tuning can be done using [plans]
 
 

}

DataFrame Sample code

object DataFrameExample {

  case class Person(ID: Int, name: String, age: Int, numFriends: Int)

  def mapper(line: String): Person = {
    val fields = line.split(",")
    val person: Person = Person(fields(0).toInt, fields(1), fields(2).toInt, fields(3).toInt)
    person
  }

  def main(args: Array[String]) {

    Logger.getLogger("org").setLevel(Level.ERROR)

    val spark = SparkSession.builder
      .appName("Data frame test")
      .master("local[*]")
      //needed only in windows check it
      // .config("spark.sql.warehouse.dir", "/Users/basan/Documents/Spark/tempwarehouse")
      //.config("spark.driver.allowMultipleContexts", "true")
      .getOrCreate()

    // val sc = new SparkContext("local[*]", "ERROR count with bigFile")

    var lines = spark.sparkContext.textFile("/Users/basan/workspace-spark/sparkDemo1/spark-data/friends-data.csv")
    import spark.implicits._
    val people = lines.map(mapper).toDS().cache()

    println("Inferred schema")
    people.printSchema()

    println("select the name column")
    people.select("name").show()

    println("Filter out anyone over 21")
    people.filter(people("age") < 21).show()

    println("group by age")
    people.groupBy("age").count().show()

    println("make everyone 10 years older")
    people.select(people("name"), (people("age") + 10)).show()
    people.select(people("name"), ((people("age") + 10)).alias("newcolumn")).show()


    spark.stop()

  }

}

Spark Submit - logs

cbc32a57245:bin z002qhl$ ./spark-submit --class com.basan.day3.ErrorWarnCount /Users/z002qhl/Desktop/sparkDemo.jar
2019-11-24 16:05:37 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2019-11-24 16:05:37 INFO  SparkContext:54 - Running Spark version 2.3.2
2019-11-24 16:05:37 INFO  SparkContext:54 - Submitted application: ERROR count
2019-11-24 16:05:37 INFO  SecurityManager:54 - Changing view acls to: z002qhl
2019-11-24 16:05:37 INFO  SecurityManager:54 - Changing modify acls to: z002qhl
2019-11-24 16:05:37 INFO  SecurityManager:54 - Changing view acls groups to:
2019-11-24 16:05:37 INFO  SecurityManager:54 - Changing modify acls groups to:
2019-11-24 16:05:37 INFO  SecurityManager:54 - SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(z002qhl); groups with view permissions: Set(); users  with modify permissions: Set(z002qhl); groups with modify permissions: Set()
2019-11-24 16:05:38 INFO  Utils:54 - Successfully started service 'sparkDriver' on port 49223.
2019-11-24 16:05:38 INFO  SparkEnv:54 - Registering MapOutputTracker
2019-11-24 16:05:38 INFO  SparkEnv:54 - Registering BlockManagerMaster
2019-11-24 16:05:38 INFO  BlockManagerMasterEndpoint:54 - Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
2019-11-24 16:05:38 INFO  BlockManagerMasterEndpoint:54 - BlockManagerMasterEndpoint up
2019-11-24 16:05:38 INFO  DiskBlockManager:54 - Created local directory at /private/var/folders/yh/xllvl9pd01g_skzq6_wg_3cst3pfgf/T/blockmgr-4d4dad0c-2af2-4bed-9246-1b3f1df4b40e
2019-11-24 16:05:38 INFO  MemoryStore:54 - MemoryStore started with capacity 366.3 MB
2019-11-24 16:05:38 INFO  SparkEnv:54 - Registering OutputCommitCoordinator
2019-11-24 16:05:38 INFO  log:192 - Logging initialized @31818ms
2019-11-24 16:05:38 INFO  Server:351 - jetty-9.3.z-SNAPSHOT, build timestamp: unknown, git hash: unknown
2019-11-24 16:05:38 INFO  Server:419 - Started @31906ms
2019-11-24 16:05:38 INFO  AbstractConnector:278 - Started ServerConnector@263f04ca{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
2019-11-24 16:05:38 INFO  Utils:54 - Successfully started service 'SparkUI' on port 4040.
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@2555fff0{/jobs,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@15bcf458{/jobs/json,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@5af9926a{/jobs/job,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@fac80{/jobs/job/json,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@726386ed{/stages,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@649f2009{/stages/json,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@14bb2297{/stages/stage,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@1a15b789{/stages/stage/json,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@57f791c6{/stages/pool,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@51650883{/stages/pool/json,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@6c4f9535{/storage,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@5bd1ceca{/storage/json,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@30c31dd7{/storage/rdd,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@499b2a5c{/storage/rdd/json,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@596df867{/environment,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@c1fca1e{/environment/json,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@241a53ef{/executors,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@344344fa{/executors/json,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@2db2cd5{/executors/threadDump,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@70e659aa{/executors/threadDump/json,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@615f972{/static,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@70e0accd{/,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@7957dc72{/api,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@82c57b3{/jobs/job/kill,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@5be82d43{/stages/stage/kill,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  SparkUI:54 - Bound SparkUI to 0.0.0.0, and started at http://192.168.20.34:4040
2019-11-24 16:05:38 INFO  SparkContext:54 - Added JAR file:/Users/z002qhl/Desktop/sparkDemo.jar at spark://192.168.20.34:49223/jars/sparkDemo.jar with timestamp 1574591738675
2019-11-24 16:05:38 INFO  Executor:54 - Starting executor ID driver on host localhost
2019-11-24 16:05:38 INFO  Utils:54 - Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 49224.
2019-11-24 16:05:38 INFO  NettyBlockTransferService:54 - Server created on 192.168.20.34:49224
2019-11-24 16:05:38 INFO  BlockManager:54 - Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
2019-11-24 16:05:38 INFO  BlockManagerMaster:54 - Registering BlockManager BlockManagerId(driver, 192.168.20.34, 49224, None)
2019-11-24 16:05:38 INFO  BlockManagerMasterEndpoint:54 - Registering block manager 192.168.20.34:49224 with 366.3 MB RAM, BlockManagerId(driver, 192.168.20.34, 49224, None)
2019-11-24 16:05:38 INFO  BlockManagerMaster:54 - Registered BlockManager BlockManagerId(driver, 192.168.20.34, 49224, None)
2019-11-24 16:05:38 INFO  BlockManager:54 - Initialized BlockManager: BlockManagerId(driver, 192.168.20.34, 49224, None)
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@3703bf3c{/metrics/json,null,AVAILABLE,@Spark}
2019-11-24 16:05:39 INFO  SparkContext:54 - Starting job: collect at ErrorWarnCount.scala:32
2019-11-24 16:05:39 INFO  DAGScheduler:54 - Registering RDD 1 (map at ErrorWarnCount.scala:23)
2019-11-24 16:05:39 INFO  DAGScheduler:54 - Got job 0 (collect at ErrorWarnCount.scala:32) with 8 output partitions
2019-11-24 16:05:39 INFO  DAGScheduler:54 - Final stage: ResultStage 1 (collect at ErrorWarnCount.scala:32)
2019-11-24 16:05:39 INFO  DAGScheduler:54 - Parents of final stage: List(ShuffleMapStage 0)
2019-11-24 16:05:39 INFO  DAGScheduler:54 - Missing parents: List(ShuffleMapStage 0)
2019-11-24 16:05:39 INFO  DAGScheduler:54 - Submitting ShuffleMapStage 0 (MapPartitionsRDD[1] at map at ErrorWarnCount.scala:23), which has no missing parents
2019-11-24 16:05:39 INFO  MemoryStore:54 - Block broadcast_0 stored as values in memory (estimated size 3.1 KB, free 366.3 MB)
2019-11-24 16:05:39 INFO  MemoryStore:54 - Block broadcast_0_piece0 stored as bytes in memory (estimated size 2008.0 B, free 366.3 MB)
2019-11-24 16:05:39 INFO  BlockManagerInfo:54 - Added broadcast_0_piece0 in memory on 192.168.20.34:49224 (size: 2008.0 B, free: 366.3 MB)
2019-11-24 16:05:39 INFO  SparkContext:54 - Created broadcast 0 from broadcast at DAGScheduler.scala:1039
2019-11-24 16:05:39 INFO  DAGScheduler:54 - Submitting 8 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[1] at map at ErrorWarnCount.scala:23) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7))
2019-11-24 16:05:39 INFO  TaskSchedulerImpl:54 - Adding task set 0.0 with 8 tasks
2019-11-24 16:05:39 INFO  TaskSetManager:54 - Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 7844 bytes)
2019-11-24 16:05:39 INFO  TaskSetManager:54 - Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, PROCESS_LOCAL, 7878 bytes)
2019-11-24 16:05:39 INFO  TaskSetManager:54 - Starting task 2.0 in stage 0.0 (TID 2, localhost, executor driver, partition 2, PROCESS_LOCAL, 7844 bytes)
2019-11-24 16:05:39 INFO  TaskSetManager:54 - Starting task 3.0 in stage 0.0 (TID 3, localhost, executor driver, partition 3, PROCESS_LOCAL, 7879 bytes)
2019-11-24 16:05:39 INFO  TaskSetManager:54 - Starting task 4.0 in stage 0.0 (TID 4, localhost, executor driver, partition 4, PROCESS_LOCAL, 7844 bytes)
2019-11-24 16:05:39 INFO  TaskSetManager:54 - Starting task 5.0 in stage 0.0 (TID 5, localhost, executor driver, partition 5, PROCESS_LOCAL, 7879 bytes)
2019-11-24 16:05:39 INFO  TaskSetManager:54 - Starting task 6.0 in stage 0.0 (TID 6, localhost, executor driver, partition 6, PROCESS_LOCAL, 7844 bytes)
2019-11-24 16:05:39 INFO  TaskSetManager:54 - Starting task 7.0 in stage 0.0 (TID 7, localhost, executor driver, partition 7, PROCESS_LOCAL, 7879 bytes)
2019-11-24 16:05:39 INFO  Executor:54 - Running task 5.0 in stage 0.0 (TID 5)
2019-11-24 16:05:39 INFO  Executor:54 - Running task 7.0 in stage 0.0 (TID 7)
2019-11-24 16:05:39 INFO  Executor:54 - Running task 3.0 in stage 0.0 (TID 3)
2019-11-24 16:05:39 INFO  Executor:54 - Running task 6.0 in stage 0.0 (TID 6)
2019-11-24 16:05:39 INFO  Executor:54 - Running task 0.0 in stage 0.0 (TID 0)
2019-11-24 16:05:39 INFO  Executor:54 - Running task 4.0 in stage 0.0 (TID 4)
2019-11-24 16:05:39 INFO  Executor:54 - Running task 2.0 in stage 0.0 (TID 2)
2019-11-24 16:05:39 INFO  Executor:54 - Running task 1.0 in stage 0.0 (TID 1)
2019-11-24 16:05:39 INFO  Executor:54 - Fetching spark://192.168.20.34:49223/jars/sparkDemo.jar with timestamp 1574591738675
2019-11-24 16:05:39 INFO  TransportClientFactory:267 - Successfully created connection to /192.168.20.34:49223 after 34 ms (0 ms spent in bootstraps)
2019-11-24 16:05:39 INFO  Utils:54 - Fetching spark://192.168.20.34:49223/jars/sparkDemo.jar to /private/var/folders/yh/xllvl9pd01g_skzq6_wg_3cst3pfgf/T/spark-bf924cf5-40e0-47c7-a7aa-e08fc7f4a865/userFiles-3d0f3b95-2c81-4840-811c-4581c1c333e3/fetchFileTemp4604090937888069829.tmp
2019-11-24 16:05:40 INFO  Executor:54 - Adding file:/private/var/folders/yh/xllvl9pd01g_skzq6_wg_3cst3pfgf/T/spark-bf924cf5-40e0-47c7-a7aa-e08fc7f4a865/userFiles-3d0f3b95-2c81-4840-811c-4581c1c333e3/sparkDemo.jar to class loader
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 7.0 in stage 0.0 (TID 7). 1075 bytes result sent to driver
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 0.0 in stage 0.0 (TID 0). 989 bytes result sent to driver
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 1.0 in stage 0.0 (TID 1). 1075 bytes result sent to driver
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 2.0 in stage 0.0 (TID 2). 989 bytes result sent to driver
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 5.0 in stage 0.0 (TID 5). 1075 bytes result sent to driver
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 4.0 in stage 0.0 (TID 4). 989 bytes result sent to driver
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 3.0 in stage 0.0 (TID 3). 1075 bytes result sent to driver
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 6.0 in stage 0.0 (TID 6). 989 bytes result sent to driver
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 2.0 in stage 0.0 (TID 2) in 451 ms on localhost (executor driver) (1/8)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 7.0 in stage 0.0 (TID 7) in 451 ms on localhost (executor driver) (2/8)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 0.0 in stage 0.0 (TID 0) in 481 ms on localhost (executor driver) (3/8)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 1.0 in stage 0.0 (TID 1) in 456 ms on localhost (executor driver) (4/8)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 4.0 in stage 0.0 (TID 4) in 455 ms on localhost (executor driver) (5/8)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 3.0 in stage 0.0 (TID 3) in 456 ms on localhost (executor driver) (6/8)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 5.0 in stage 0.0 (TID 5) in 455 ms on localhost (executor driver) (7/8)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 6.0 in stage 0.0 (TID 6) in 454 ms on localhost (executor driver) (8/8)
2019-11-24 16:05:40 INFO  TaskSchedulerImpl:54 - Removed TaskSet 0.0, whose tasks have all completed, from pool
2019-11-24 16:05:40 INFO  DAGScheduler:54 - ShuffleMapStage 0 (map at ErrorWarnCount.scala:23) finished in 0.693 s
2019-11-24 16:05:40 INFO  DAGScheduler:54 - looking for newly runnable stages
2019-11-24 16:05:40 INFO  DAGScheduler:54 - running: Set()
2019-11-24 16:05:40 INFO  DAGScheduler:54 - waiting: Set(ResultStage 1)
2019-11-24 16:05:40 INFO  DAGScheduler:54 - failed: Set()
2019-11-24 16:05:40 INFO  DAGScheduler:54 - Submitting ResultStage 1 (ShuffledRDD[2] at reduceByKey at ErrorWarnCount.scala:31), which has no missing parents
2019-11-24 16:05:40 INFO  MemoryStore:54 - Block broadcast_1 stored as values in memory (estimated size 3.2 KB, free 366.3 MB)
2019-11-24 16:05:40 INFO  MemoryStore:54 - Block broadcast_1_piece0 stored as bytes in memory (estimated size 2031.0 B, free 366.3 MB)
2019-11-24 16:05:40 INFO  BlockManagerInfo:54 - Added broadcast_1_piece0 in memory on 192.168.20.34:49224 (size: 2031.0 B, free: 366.3 MB)
2019-11-24 16:05:40 INFO  SparkContext:54 - Created broadcast 1 from broadcast at DAGScheduler.scala:1039
2019-11-24 16:05:40 INFO  DAGScheduler:54 - Submitting 8 missing tasks from ResultStage 1 (ShuffledRDD[2] at reduceByKey at ErrorWarnCount.scala:31) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7))
2019-11-24 16:05:40 INFO  TaskSchedulerImpl:54 - Adding task set 1.0 with 8 tasks
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Starting task 1.0 in stage 1.0 (TID 8, localhost, executor driver, partition 1, PROCESS_LOCAL, 7649 bytes)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Starting task 3.0 in stage 1.0 (TID 9, localhost, executor driver, partition 3, PROCESS_LOCAL, 7649 bytes)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Starting task 4.0 in stage 1.0 (TID 10, localhost, executor driver, partition 4, PROCESS_LOCAL, 7649 bytes)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Starting task 5.0 in stage 1.0 (TID 11, localhost, executor driver, partition 5, PROCESS_LOCAL, 7649 bytes)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Starting task 6.0 in stage 1.0 (TID 12, localhost, executor driver, partition 6, PROCESS_LOCAL, 7649 bytes)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Starting task 7.0 in stage 1.0 (TID 13, localhost, executor driver, partition 7, PROCESS_LOCAL, 7649 bytes)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Starting task 0.0 in stage 1.0 (TID 14, localhost, executor driver, partition 0, ANY, 7649 bytes)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Starting task 2.0 in stage 1.0 (TID 15, localhost, executor driver, partition 2, ANY, 7649 bytes)
2019-11-24 16:05:40 INFO  Executor:54 - Running task 3.0 in stage 1.0 (TID 9)
2019-11-24 16:05:40 INFO  Executor:54 - Running task 2.0 in stage 1.0 (TID 15)
2019-11-24 16:05:40 INFO  Executor:54 - Running task 5.0 in stage 1.0 (TID 11)
2019-11-24 16:05:40 INFO  Executor:54 - Running task 7.0 in stage 1.0 (TID 13)
2019-11-24 16:05:40 INFO  Executor:54 - Running task 0.0 in stage 1.0 (TID 14)
2019-11-24 16:05:40 INFO  Executor:54 - Running task 6.0 in stage 1.0 (TID 12)
2019-11-24 16:05:40 INFO  Executor:54 - Running task 4.0 in stage 1.0 (TID 10)
2019-11-24 16:05:40 INFO  Executor:54 - Running task 1.0 in stage 1.0 (TID 8)
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Getting 1 non-empty blocks out of 8 blocks
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Getting 0 non-empty blocks out of 8 blocks
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Getting 0 non-empty blocks out of 8 blocks
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Getting 3 non-empty blocks out of 8 blocks
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Getting 0 non-empty blocks out of 8 blocks
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Getting 0 non-empty blocks out of 8 blocks
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Getting 0 non-empty blocks out of 8 blocks
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Getting 0 non-empty blocks out of 8 blocks
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Started 0 remote fetches in 9 ms
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Started 0 remote fetches in 9 ms
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Started 0 remote fetches in 8 ms
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Started 0 remote fetches in 9 ms
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Started 0 remote fetches in 8 ms
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Started 0 remote fetches in 9 ms
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Started 0 remote fetches in 9 ms
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Started 0 remote fetches in 9 ms
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 3.0 in stage 1.0 (TID 9). 1134 bytes result sent to driver
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 7.0 in stage 1.0 (TID 13). 1134 bytes result sent to driver
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 4.0 in stage 1.0 (TID 10). 1134 bytes result sent to driver
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 5.0 in stage 1.0 (TID 11). 1134 bytes result sent to driver
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 6.0 in stage 1.0 (TID 12). 1134 bytes result sent to driver
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 1.0 in stage 1.0 (TID 8). 1134 bytes result sent to driver
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 7.0 in stage 1.0 (TID 13) in 48 ms on localhost (executor driver) (1/8)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 3.0 in stage 1.0 (TID 9) in 50 ms on localhost (executor driver) (2/8)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 4.0 in stage 1.0 (TID 10) in 50 ms on localhost (executor driver) (3/8)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 5.0 in stage 1.0 (TID 11) in 50 ms on localhost (executor driver) (4/8)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 6.0 in stage 1.0 (TID 12) in 50 ms on localhost (executor driver) (5/8)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 1.0 in stage 1.0 (TID 8) in 53 ms on localhost (executor driver) (6/8)
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 2.0 in stage 1.0 (TID 15). 1284 bytes result sent to driver
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 2.0 in stage 1.0 (TID 15) in 54 ms on localhost (executor driver) (7/8)
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 0.0 in stage 1.0 (TID 14). 1285 bytes result sent to driver
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 0.0 in stage 1.0 (TID 14) in 56 ms on localhost (executor driver) (8/8)
2019-11-24 16:05:40 INFO  TaskSchedulerImpl:54 - Removed TaskSet 1.0, whose tasks have all completed, from pool
2019-11-24 16:05:40 INFO  DAGScheduler:54 - ResultStage 1 (collect at ErrorWarnCount.scala:32) finished in 0.092 s
2019-11-24 16:05:40 INFO  DAGScheduler:54 - Job 0 finished: collect at ErrorWarnCount.scala:32, took 1.088511 s
(ERROR ,3)
(WARN ,1)
single line conversion
2019-11-24 16:05:40 INFO  SparkContext:54 - Starting job: collect at ErrorWarnCount.scala:39
2019-11-24 16:05:40 INFO  DAGScheduler:54 - Registering RDD 4 (map at ErrorWarnCount.scala:37)
2019-11-24 16:05:40 INFO  DAGScheduler:54 - Got job 1 (collect at ErrorWarnCount.scala:39) with 8 output partitions
2019-11-24 16:05:40 INFO  DAGScheduler:54 - Final stage: ResultStage 3 (collect at ErrorWarnCount.scala:39)
2019-11-24 16:05:40 INFO  DAGScheduler:54 - Parents of final stage: List(ShuffleMapStage 2)
2019-11-24 16:05:40 INFO  DAGScheduler:54 - Missing parents: List(ShuffleMapStage 2)
2019-11-24 16:05:40 INFO  DAGScheduler:54 - Submitting ShuffleMapStage 2 (MapPartitionsRDD[4] at map at ErrorWarnCount.scala:37), which has no missing parents
2019-11-24 16:05:40 INFO  MemoryStore:54 - Block broadcast_2 stored as values in memory (estimated size 3.1 KB, free 366.3 MB)
2019-11-24 16:05:40 INFO  MemoryStore:54 - Block broadcast_2_piece0 stored as bytes in memory (estimated size 2010.0 B, free 366.3 MB)
2019-11-24 16:05:40 INFO  BlockManagerInfo:54 - Added broadcast_2_piece0 in memory on 192.168.20.34:49224 (size: 2010.0 B, free: 366.3 MB)
2019-11-24 16:05:40 INFO  SparkContext:54 - Created broadcast 2 from broadcast at DAGScheduler.scala:1039
2019-11-24 16:05:40 INFO  DAGScheduler:54 - Submitting 8 missing tasks from ShuffleMapStage 2 (MapPartitionsRDD[4] at map at ErrorWarnCount.scala:37) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7))
2019-11-24 16:05:40 INFO  TaskSchedulerImpl:54 - Adding task set 2.0 with 8 tasks
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Starting task 0.0 in stage 2.0 (TID 16, localhost, executor driver, partition 0, PROCESS_LOCAL, 7844 bytes)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Starting task 1.0 in stage 2.0 (TID 17, localhost, executor driver, partition 1, PROCESS_LOCAL, 7878 bytes)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Starting task 2.0 in stage 2.0 (TID 18, localhost, executor driver, partition 2, PROCESS_LOCAL, 7844 bytes)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Starting task 3.0 in stage 2.0 (TID 19, localhost, executor driver, partition 3, PROCESS_LOCAL, 7879 bytes)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Starting task 4.0 in stage 2.0 (TID 20, localhost, executor driver, partition 4, PROCESS_LOCAL, 7844 bytes)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Starting task 5.0 in stage 2.0 (TID 21, localhost, executor driver, partition 5, PROCESS_LOCAL, 7879 bytes)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Starting task 6.0 in stage 2.0 (TID 22, localhost, executor driver, partition 6, PROCESS_LOCAL, 7844 bytes)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Starting task 7.0 in stage 2.0 (TID 23, localhost, executor driver, partition 7, PROCESS_LOCAL, 7879 bytes)
2019-11-24 16:05:40 INFO  Executor:54 - Running task 1.0 in stage 2.0 (TID 17)
2019-11-24 16:05:40 INFO  Executor:54 - Running task 5.0 in stage 2.0 (TID 21)
2019-11-24 16:05:40 INFO  Executor:54 - Running task 4.0 in stage 2.0 (TID 20)
2019-11-24 16:05:40 INFO  Executor:54 - Running task 2.0 in stage 2.0 (TID 18)
2019-11-24 16:05:40 INFO  Executor:54 - Running task 3.0 in stage 2.0 (TID 19)
2019-11-24 16:05:40 INFO  Executor:54 - Running task 0.0 in stage 2.0 (TID 16)
2019-11-24 16:05:40 INFO  Executor:54 - Running task 6.0 in stage 2.0 (TID 22)
2019-11-24 16:05:40 INFO  Executor:54 - Running task 7.0 in stage 2.0 (TID 23)
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 2.0 in stage 2.0 (TID 18). 946 bytes result sent to driver
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 6.0 in stage 2.0 (TID 22). 946 bytes result sent to driver
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 2.0 in stage 2.0 (TID 18) in 15 ms on localhost (executor driver) (1/8)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 6.0 in stage 2.0 (TID 22) in 14 ms on localhost (executor driver) (2/8)
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 7.0 in stage 2.0 (TID 23). 1032 bytes result sent to driver
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 4.0 in stage 2.0 (TID 20). 946 bytes result sent to driver
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 7.0 in stage 2.0 (TID 23) in 15 ms on localhost (executor driver) (3/8)
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 5.0 in stage 2.0 (TID 21). 1032 bytes result sent to driver
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 4.0 in stage 2.0 (TID 20) in 17 ms on localhost (executor driver) (4/8)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 5.0 in stage 2.0 (TID 21) in 18 ms on localhost (executor driver) (5/8)
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 0.0 in stage 2.0 (TID 16). 946 bytes result sent to driver
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 3.0 in stage 2.0 (TID 19). 1032 bytes result sent to driver
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 1.0 in stage 2.0 (TID 17). 1032 bytes result sent to driver
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 0.0 in stage 2.0 (TID 16) in 22 ms on localhost (executor driver) (6/8)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 3.0 in stage 2.0 (TID 19) in 20 ms on localhost (executor driver) (7/8)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 1.0 in stage 2.0 (TID 17) in 22 ms on localhost (executor driver) (8/8)
2019-11-24 16:05:40 INFO  TaskSchedulerImpl:54 - Removed TaskSet 2.0, whose tasks have all completed, from pool
2019-11-24 16:05:40 INFO  DAGScheduler:54 - ShuffleMapStage 2 (map at ErrorWarnCount.scala:37) finished in 0.030 s
2019-11-24 16:05:40 INFO  DAGScheduler:54 - looking for newly runnable stages
2019-11-24 16:05:40 INFO  DAGScheduler:54 - running: Set()
2019-11-24 16:05:40 INFO  DAGScheduler:54 - waiting: Set(ResultStage 3)
2019-11-24 16:05:40 INFO  DAGScheduler:54 - failed: Set()
2019-11-24 16:05:40 INFO  DAGScheduler:54 - Submitting ResultStage 3 (ShuffledRDD[5] at reduceByKey at ErrorWarnCount.scala:39), which has no missing parents
2019-11-24 16:05:40 INFO  MemoryStore:54 - Block broadcast_3 stored as values in memory (estimated size 3.3 KB, free 366.3 MB)
2019-11-24 16:05:40 INFO  MemoryStore:54 - Block broadcast_3_piece0 stored as bytes in memory (estimated size 2027.0 B, free 366.3 MB)
2019-11-24 16:05:40 INFO  BlockManagerInfo:54 - Added broadcast_3_piece0 in memory on 192.168.20.34:49224 (size: 2027.0 B, free: 366.3 MB)
2019-11-24 16:05:40 INFO  SparkContext:54 - Created broadcast 3 from broadcast at DAGScheduler.scala:1039
2019-11-24 16:05:40 INFO  DAGScheduler:54 - Submitting 8 missing tasks from ResultStage 3 (ShuffledRDD[5] at reduceByKey at ErrorWarnCount.scala:39) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7))
2019-11-24 16:05:40 INFO  TaskSchedulerImpl:54 - Adding task set 3.0 with 8 tasks
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Starting task 1.0 in stage 3.0 (TID 24, localhost, executor driver, partition 1, PROCESS_LOCAL, 7649 bytes)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Starting task 3.0 in stage 3.0 (TID 25, localhost, executor driver, partition 3, PROCESS_LOCAL, 7649 bytes)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Starting task 4.0 in stage 3.0 (TID 26, localhost, executor driver, partition 4, PROCESS_LOCAL, 7649 bytes)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Starting task 5.0 in stage 3.0 (TID 27, localhost, executor driver, partition 5, PROCESS_LOCAL, 7649 bytes)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Starting task 6.0 in stage 3.0 (TID 28, localhost, executor driver, partition 6, PROCESS_LOCAL, 7649 bytes)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Starting task 7.0 in stage 3.0 (TID 29, localhost, executor driver, partition 7, PROCESS_LOCAL, 7649 bytes)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Starting task 0.0 in stage 3.0 (TID 30, localhost, executor driver, partition 0, ANY, 7649 bytes)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Starting task 2.0 in stage 3.0 (TID 31, localhost, executor driver, partition 2, ANY, 7649 bytes)
2019-11-24 16:05:40 INFO  Executor:54 - Running task 3.0 in stage 3.0 (TID 25)
2019-11-24 16:05:40 INFO  Executor:54 - Running task 5.0 in stage 3.0 (TID 27)
2019-11-24 16:05:40 INFO  Executor:54 - Running task 4.0 in stage 3.0 (TID 26)
2019-11-24 16:05:40 INFO  Executor:54 - Running task 1.0 in stage 3.0 (TID 24)
2019-11-24 16:05:40 INFO  Executor:54 - Running task 0.0 in stage 3.0 (TID 30)
2019-11-24 16:05:40 INFO  Executor:54 - Running task 2.0 in stage 3.0 (TID 31)
2019-11-24 16:05:40 INFO  Executor:54 - Running task 7.0 in stage 3.0 (TID 29)
2019-11-24 16:05:40 INFO  Executor:54 - Running task 6.0 in stage 3.0 (TID 28)
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Getting 0 non-empty blocks out of 8 blocks
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Getting 0 non-empty blocks out of 8 blocks
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Started 0 remote fetches in 1 ms
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Started 0 remote fetches in 1 ms
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Getting 3 non-empty blocks out of 8 blocks
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Started 0 remote fetches in 1 ms
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Getting 1 non-empty blocks out of 8 blocks
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Started 0 remote fetches in 2 ms
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Getting 0 non-empty blocks out of 8 blocks
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Getting 0 non-empty blocks out of 8 blocks
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Started 0 remote fetches in 1 ms
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Getting 0 non-empty blocks out of 8 blocks
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Started 0 remote fetches in 1 ms
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Getting 0 non-empty blocks out of 8 blocks
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Started 0 remote fetches in 2 ms
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Started 0 remote fetches in 2 ms
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 3.0 in stage 3.0 (TID 25). 1134 bytes result sent to driver
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 6.0 in stage 3.0 (TID 28). 1134 bytes result sent to driver
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 5.0 in stage 3.0 (TID 27). 1134 bytes result sent to driver
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 3.0 in stage 3.0 (TID 25) in 17 ms on localhost (executor driver) (1/8)
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 2.0 in stage 3.0 (TID 31). 1327 bytes result sent to driver
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 7.0 in stage 3.0 (TID 29). 1134 bytes result sent to driver
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 6.0 in stage 3.0 (TID 28) in 17 ms on localhost (executor driver) (2/8)
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 0.0 in stage 3.0 (TID 30). 1285 bytes result sent to driver
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 5.0 in stage 3.0 (TID 27) in 18 ms on localhost (executor driver) (3/8)
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 4.0 in stage 3.0 (TID 26). 1134 bytes result sent to driver
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 2.0 in stage 3.0 (TID 31) in 16 ms on localhost (executor driver) (4/8)
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 1.0 in stage 3.0 (TID 24). 1134 bytes result sent to driver
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 7.0 in stage 3.0 (TID 29) in 18 ms on localhost (executor driver) (5/8)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 0.0 in stage 3.0 (TID 30) in 18 ms on localhost (executor driver) (6/8)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 4.0 in stage 3.0 (TID 26) in 20 ms on localhost (executor driver) (7/8)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 1.0 in stage 3.0 (TID 24) in 21 ms on localhost (executor driver) (8/8)
2019-11-24 16:05:40 INFO  TaskSchedulerImpl:54 - Removed TaskSet 3.0, whose tasks have all completed, from pool
2019-11-24 16:05:40 INFO  DAGScheduler:54 - ResultStage 3 (collect at ErrorWarnCount.scala:39) finished in 0.037 s
2019-11-24 16:05:40 INFO  DAGScheduler:54 - Job 1 finished: collect at ErrorWarnCount.scala:39, took 0.073152 s
(ERROR ,3)
(WARN ,1)
2019-11-24 16:05:40 INFO  SparkContext:54 - Invoking stop() from shutdown hook
2019-11-24 16:05:40 INFO  AbstractConnector:318 - Stopped Spark@263f04ca{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
2019-11-24 16:05:40 INFO  SparkUI:54 - Stopped Spark web UI at http://192.168.20.34:4040
2019-11-24 16:05:40 INFO  MapOutputTrackerMasterEndpoint:54 - MapOutputTrackerMasterEndpoint stopped!
2019-11-24 16:05:40 INFO  MemoryStore:54 - MemoryStore cleared
2019-11-24 16:05:40 INFO  BlockManager:54 - BlockManager stopped
2019-11-24 16:05:40 INFO  BlockManagerMaster:54 - BlockManagerMaster stopped
2019-11-24 16:05:40 INFO  OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:54 - OutputCommitCoordinator stopped!
2019-11-24 16:05:40 INFO  SparkContext:54 - Successfully stopped SparkContext
2019-11-24 16:05:40 INFO  ShutdownHookManager:54 - Shutdown hook called
2019-11-24 16:05:40 INFO  ShutdownHookManager:54 - Deleting directory /private/var/folders/yh/xllvl9pd01g_skzq6_wg_3cst3pfgf/T/spark-bf924cf5-40e0-47c7-a7aa-e08fc7f4a865
2019-11-24 16:05:40 INFO  ShutdownHookManager:54 - Deleting directory /private/var/folders/yh/xllvl9pd01g_skzq6_wg_3cst3pfgf/T/spark-4b1486ba-9b64-47d2-931d-941204c6f84e

Spark submit job

originalRDD.map(x => {
      (x.split(":")(0), x.split(":")(1))
    }).groupByKey().collect().foreach(x => println(
      x._1, x._2.size))


Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Operation timed out: acbc32a57245.target.com/10.112.144.201:60678

320MB
local will have 11 partitions as each 32 MB

groupByKey
will lead to
1. lot of shuffle
2. only few number of machines will work based on number of keys
3. chances of memory spill




What is difference between  groupByKey and reduceByKey?
  local aggregation will happen. Kind of local combiner of MR comes to picture.

local calculation of the block will happen and then those results sent to driver.

Dont use groupByKey instead use reduceByKey


rWhat is educe and reduceByKey difference?

reduce is the action - as it gives one value as local variable.
reduceByKey  is transformation which gives rdd.


What is  narrow and wide transformation?
narrow - no shuffling

What is stage ?
Why spark is lazy?
  predicate pushdown : Internally reorder the plans. Using predicate pushdown which means moving filter up.
  pipelingin : it can combine the logical blocks

Why RDDS are immutable?
To achieve resiellency.

countByValue and reduceByKey
If the operation is final and subsequent rdd operation is not needed use countByValue.
If the operation is not final and subsequent rdd operation is  needed use reduceByKey.



what is the difference between cache and persist?

cache has to be always in memory.
persisit can be in memory, disk , memory disk

rdd1, rdd2, rdd3, rdd4 are the set of transformations
if there are chain of transformations, and we have cleaned all data and if we want to checkpoint
use rdd4.cache

If it does not fit in memory - use persist.

rdd1, rdd2, rdd3, rdd4.cache, rdd5, rdd6 action1 action2

action1 : For action1 all rdd tranformations rdd1 to rdd6  will be done
action2 : For action2  tranformations rdd4 to rdd6  will be done

======================================================================================================

cd /Users/z002qhl/Documents/Spark/spark-2.4.1-bin-hadoop2.7/bin
./spark-submit --class com.basan.day3.ErrorWarnCount /Users/z002qhl/Desktop/sparkDemo.jar

Saturday, November 23, 2019

Bucketing Hive


bucketing

clustered by  column

column will be part of create table.

hive> set hive.enforce.bucketing
    > ;
hive.enforce.bucketing=false


To enable bucketing
set hive.enforce.bucketing=true


//enable bucketing
set hive.enforce.bucketing=true

//create table without any bucket
create table products_no_buckets
(
  id int,
  name string,
  cost double,
  category string
)
row format delimited fields terminated by ',';

//load the data to no bucket table
load data local inpath '/home/cloudera/Desktop/newproducts.csv'
into table products_no_buckets;



newproducts.csv

1,iPhone,379.99,mobiles
2,doll,8.99,toys
3,Galaxy X,100,mobile
5,Nokia Y,39.99,mobile
6,truck,7.99,toys
7,makeup,100,fashion
8,earings,69,fashion
9,chair,129,furniture
10,table,269,furniture
11,waterpistol,9,toys


//create table with bucketing
create table products_w_buckets ( id int, name string, cost double, category string ) CLUSTERED BY (id) INTO 4 BUCKETS;

//insert data from non bucket table to bucketed table
insert into table products_w_buckets select id, name, cost, category from products_no_buckets;

//check the file created for each bucket
hdfs dfs -ls /user/hive/warehouse/basan.db/products_w_buckets

root@quickstart Desktop]# hdfs dfs -ls /user/hive/warehouse/basan.db/products_w_buckets
Found 1 items
-rwxrwxrwx   1 cloudera supergroup        225 2019-11-23 23:42 /user/hive/warehouse/basan.db/products_w_buckets/000000_0
[root@quickstart Desktop]# 

Partitioning and folder creation in Hive


Column on which we want to partition that need not be there in
the create table definition just need to put it in partition by tag

but when we do select, the column will be shown as virtual column

static partitioning and dyanamic partitioning


http://www.decloedtjr.com.br/hive-cheat-sheet/

http://www.decloedtjr.com.br/hive-cheat-sheet/
http://rkeagle.com/public/blog.php?category=hadoop&page=9

create table orders_no_partition
(
  id string,
  customer_id string,
  product_id string,
  quantity int,
  amount double,
  zipcode char(5),
  state char(2)
)
COMMENT 'Table with no partition applied'
row format delimited fields terminated by ',';



Logging initialized using configuration in file:/etc/hive/conf.dist/hive-log4j.properties
WARNING: Hive CLI is deprecated and migration to Beeline is recommended.
hive> use basan
    > ;
OK
Time taken: 0.31 seconds
hive> create table orders_no_partition
    > (
    >   id string,
    >   customer_id string,
    >   product_id string,
    >   quantity int,
    >   amount double,
    >   zipcode char(5),
    >   state char(2)
    > )
    > COMMENT 'Table with no partition applied'
    > row format delimited fields terminated by ',';
OK
Time taken: 0.256 seconds
hive> describe formatted orders_no_partition;
OK
# col_name            data_type            comment           

id                  string                                 
customer_id          string                                 
product_id          string                                 
quantity            int                                     
amount              double                                 
zipcode              char(5)                                 
state                char(2)                                 

# Detailed Table Information
Database:            basan               
Owner:              cloudera           
CreateTime:          Sat Nov 23 22:35:16 PST 2019
LastAccessTime:      UNKNOWN             
Protect Mode:        None               
Retention:          0                   
Location:            hdfs://quickstart.cloudera:8020/user/hive/warehouse/basan.db/orders_no_partition
Table Type:          MANAGED_TABLE       
Table Parameters:
comment              Table with no partition applied
transient_lastDdlTime 1574577316         

# Storage Information
SerDe Library:      org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat:        org.apache.hadoop.mapred.TextInputFormat
OutputFormat:        org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Compressed:          No                 
Num Buckets:        -1                 
Bucket Columns:      []                 
Sort Columns:        []                 
Storage Desc Params:
field.delim          ,                 
serialization.format ,                 
Time taken: 0.153 seconds, Fetched: 34 row(s)
hive>



ime taken: 0.153 seconds, Fetched: 34 row(s)
hive> create table orders_w_partition
    > (
    >   id string,
    >   customer_id string,
    >   product_id string,
    >   quantity int,
    >   amount double,
    >   zipcode char(5)
    > )
    > COMMENT 'Table with partition applied by state'
    > partitioned by (state char(2));
OK
Time taken: 0.124 seconds
hive>
    > show tables like 'orders_w*';
OK
orders_w_partition
Time taken: 0.024 seconds, Fetched: 1 row(s)
hive>


show tables like 'orders_w*';
OK
+---------------------+--+
|      tab_name       |
+---------------------+--+
| orders_w_partition  |
+---------------------+--+

show partitions orders_w_partition;
OK
+------------+--+
| partition  |
+------------+--+
+------------+--+

insert into orders_w_partition
partition(state="CA")
values
("o1", "c1", "p1", 1, 1.11, "90111"),
("o2", "c2", "p2", 1, 2.22, "90222"),
("o3", "c3", "p3", 1, 3.33, "90333"),
("o4", "c4", "p4", 1, 4.44, "90444");

insert into orders_w_partition
partition(state="WA")
values
("o10", "c10", "p10", 2, 10.11, "91111"),
("o20", "c20", "p20", 2, 20.22, "91222"),
("o30", "c30", "p30", 2, 30.33, "91333"),
("o40", "c40", "p40", 2, 40.44, "91444");

insert into orders_w_partition
partition(state="CA")
values ("o5", "c5", "p5", 1, 5.55, "90555");

insert into orders_w_partition
partition(state="NJ")
values
("o100", "c100", "p100", 3, 100.11, "92111"),
("o200", "c200", "p200", 3, 200.22, "92222");

insert into orders_w_partition
partition(state="NY")
values
("o201", "c201", "p201", 4, 201.22, "92122");





hive> show partitions orders_w_partition;
OK
state=CA
state=NJ
state=NY
state=WA
Time taken: 0.069 seconds, Fetched: 4 row(s)


[root@quickstart Desktop]# hdfs dfs -ls /user/hive/warehouse/basan.db/orders_w_partition
Found 4 items
drwxrwxrwx   - cloudera supergroup          0 2019-11-23 22:40 /user/hive/warehouse/basan.db/orders_w_partition/state=CA
drwxrwxrwx   - cloudera supergroup          0 2019-11-23 22:41 /user/hive/warehouse/basan.db/orders_w_partition/state=NJ
drwxrwxrwx   - cloudera supergroup          0 2019-11-23 22:41 /user/hive/warehouse/basan.db/orders_w_partition/state=NY
drwxrwxrwx   - cloudera supergroup          0 2019-11-23 22:40 /user/hive/warehouse/basan.db/orders_w_partition/state=WA
[root@quickstart Desktop]#

Observer the folders created for the partition

[root@quickstart Desktop]# hdfs dfs -ls /user/hive/warehouse/basan.db/orders_w_partition/state=CA
Found 2 items
-rwxrwxrwx   1 cloudera supergroup         88 2019-11-23 22:39 /user/hive/warehouse/basan.db/orders_w_partition/state=CA/000000_0
-rwxrwxrwx   1 cloudera supergroup         22 2019-11-23 22:40 /user/hive/warehouse/basan.db/orders_w_partition/state=CA/000000_0_copy_1
[root@quickstart Desktop]#

[root@quickstart Desktop]# hdfs dfs -ls /user/hive/warehouse/basan.db/orders_w_partition/state=CA
Found 2 items
-rwxrwxrwx   1 cloudera supergroup         88 2019-11-23 22:39 /user/hive/warehouse/basan.db/orders_w_partition/state=CA/000000_0
-rwxrwxrwx   1 cloudera supergroup         22 2019-11-23 22:40 /user/hive/warehouse/basan.db/orders_w_partition/state=CA/000000_0_copy_1
[root@quickstart Desktop]# hdfs dfs -cat /user/hive/warehouse/basan.db/orders_w_partition/state=CA/000000_0
o1 c1 p1 1 1.11 90111
o2 c2 p2 1 2.22 90222
o3 c3 p3 1 3.33 90333
o4 c4 p4 1 4.44 90444
[root@quickstart Desktop]# pwd

==========================================================

Loading from Files into a Partitioned Table

drop table orders_w_partition;

create table orders_w_partition
(
  id string,
  customer_id string,
  product_id string,
  quantity int,
  amount double,
  zipcode char(5)
)
COMMENT 'Table with partition applied by state, load data from fs'
partitioned by (state char(2))
row format delimited fields terminated by ',';




load data local inpath '/home/cloudera/Desktop/orders_CA.csv'
into table orders_w_partition
partition (state="CA");

load data local inpath '/home/cloudera/Desktop/orders_CT.csv'
into table orders_w_partition
partition (state="CT");

show partitions orders_w_partition;


[root@quickstart Desktop]# hdfs dfs -ls /user/hive/warehouse/basan.db/orders_w_partition
Found 2 items
drwxrwxrwx   - cloudera supergroup          0 2019-11-23 22:53 /user/hive/warehouse/basan.db/orders_w_partition/state=CA
drwxrwxrwx   - cloudera supergroup          0 2019-11-23 22:53 /user/hive/warehouse/basan.db/orders_w_partition/state=CT
[root@quickstart Desktop]#