Powered By Blogger

Tuesday, December 17, 2019

Ordering of Columns based on table name

def getOrderedDFBasedOnTableStructure(inputDF:DataFrame, tableName :String,spark:SparkSession): Unit ={
  val columnNames = spark.table(tableName).columns
  logInfo(s"table $tableName is having columns in this $columnNames")
  val orderedDF = inputDF.select(columnNames.head, columnNames.tail: _*)
  orderedDF}

Saturday, December 7, 2019

Kafka , broker, producer , consumer

kafka

Please download kafka in cloudera VM using the link:
https://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz


inside bin

zookeeper-server-start.sh ../config/zookeeper.properties



telnet localhost 2181

stat

./kafka-server-start.sh ../config/server.properties


./kafka-topics.sh --create --topic my_topic --zookeeper localhost:2181 --replication-factor 1 --partitions 1

./kafka-topics.sh --list --zookeeper localhost:2181

./kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic

./kafka-console-consumer.sh --zookeeper localhost:2181 --topic my_topic --from-beginning


cd /tmp/kafka-logs

./kafka-topics.sh --create --topic my_topic --zookeeper localhost:2181 --replication-factor 1 --partitions 3

./kafka-topics.sh --create --topic my_topic --zookeeper localhost:2181 --replication-factor 3 --partitions 3

./kafka-topics.sh --describe --topic my_topic --zookeeper localhost:2181

==============================================================================================================================

Each machine is called as broker.
We can have multiple brokers in a machine.


topic  : twitterTopic it will have partitions.
partition :


Produces will produce messages.  partition will be stored in broker.
If we have 3 node broker and 3 partitions, each partition will be stored in the broker.


We will have the replication factor, to store the same partition in different broker.

zookeeper : decides who is master, and keeps track of all the nodes.


==============================================================================================================================
cd /Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/config

zookeeper-server-start.sh ../config/zookeeper.properties

(base) basanmachine:config basan$ cat server.properties

log.retention.hours=168
log.retention.check.interval.ms=300000



broker.id=0
# Each broker should have different name.
listeners=PLAINTEXT://:9092 - where kafka broker shpould run by default.

log.dirs=/tmp/kafka-logs # where to put the messages.

==============================================================================================================================
start the broker

(base) basanmachine:bin basan$ ./kafka-server-start.sh ../config/server.prop

on the successful start we will get below message

[2019-12-07 17:07:17,615] INFO Kafka commitId : fdcf75ea326b8e07 (org.apache.kafka.common.utils.AppInfoParser)
[2019-12-07 17:07:17,617] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

==============================================================================================================================

creating topic

(base) basanmachine:bin basan$ pwd
/Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/bin
./kafka-topics.sh --create --topic my_topic --zookeeper localhost:2181 --replication-factor 1 --partitions 1


(base) basanmachine:bin basan$ ./kafka-topics.sh --create --topic my_topic --zookeeper localhost:2181 --replication-factor 1 --partitions 1
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic "my_topic".
(base) basanmachine:bin basan$

==============================================================================================================================
listing the topic

(base) basanmachine:bin basan$ ./kafka-topics.sh --list --zookeeper localhost:2181
my_topic
(base) basanmachine:bin basan$

==============================================================================================================================

producing messages to kafka
(base) basanmachine:bin basan$ pwd
/Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/bin
(base) basanmachine:bin basan$

./kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic


==============================================================================================================================

consuming messages from kafka

./kafka-console-consumer.sh --zookeeper localhost:2181 --topic my_topic --from-beginning


base) basanmachine:~ basan$ cd /Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/bin
(base) basanmachine:bin basan$
(base) basanmachine:bin basan$ ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic my_topic --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].

==============================================================================================================================

checking stored content

(base) basanmachine:kafka-logs basan$ ls
cleaner-offset-checkpoint
log-start-offset-checkpoint
meta.properties
recovery-point-offset-checkpoint
replication-offset-checkpoint


==============================================================================================================================

rabbit MQ vs Kafka

As soon as the message is read  from MQ the message will be gone but in kafka we will have message untill it gets
expired.

Its same with ActiveMQ


==============================================================================================================================

creating 3 brokers, we have to change the broker port and the log location

/Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/config
(base) basanmachine:config basan$ pwd
/Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/config
(base) basanmachine:config basan$ cp server.properties server2.properties
(base) basanmachine:config basan$ cp server.properties server3.properties
(base) basanmachine:config basan$vi server2.properties

change the broker.id=2
listeners=PLAINTEXT://localhost:9093
log.dirs=/tmp/kafka-logs2


(base) basanmachine:config basan$ pwd
/Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/config
(base) basanmachine:config basan$vi server3.properties
broker.id=3
listeners=PLAINTEXT://localhost:9094
log.dirs=/tmp/kafka-logs3


(base) basanmachine:bin basan$ pwd
/Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/bin
start the brokers and zookeeper
./zookeeper-server-start.sh ../config/zookeeper.properties
./kafka-server-start.sh ../config/server.properties
./kafka-server-start.sh ../config/server2.properties
./kafka-server-start.sh ../config/server3.properties

[2019-12-07 17:44:42,822] INFO [ReplicaAlterLogDirsManager on broker 0] Added fetcher for partitions List() (kafka.server.ReplicaAlterLogDirsManager)

[2019-12-07 17:45:23,601] INFO Kafka commitId : fdcf75ea326b8e07 (org.apache.kafka.common.utils.AppInfoParser)
[2019-12-07 17:45:03,132] INFO Kafka commitId : fdcf75ea326b8e07 (org.apache.kafka.common.utils.AppInfoParser)
[2019-12-07 17:45:03,133] INFO [KafkaServer id=2] started (kafka.server.KafkaServer)

[2019-12-07 17:45:03,133] INFO [KafkaServer id=2] started (kafka.server.KafkaServer)


(base) basanmachine:~ basan$ cd /Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/bin
./kafka-topics.sh --create --topic multipletopic --zookeeper localhost:2181 --replication-factor 3 --partitions 3

./kafka-topics.sh --describe --topic multipletopic --zookeeper localhost:2181



(base) basanmachine:bin basan$ ./kafka-topics.sh --create --topic multipletopic --zookeeper localhost:2181 --replication-factor 3 --partitions 3
Created topic "multipletopic".
(base) basanmachine:bin basan$


(base) basanmachine:bin basan$ ./kafka-topics.sh --describe --topic multipletopic --zookeeper localhost:2181
Topic:multipletopic PartitionCount:3 ReplicationFactor:3 Configs:
Topic: multipletopic Partition: 0 Leader: 0 Replicas: 0,3,2 Isr: 0,3,2
Topic: multipletopic Partition: 1 Leader: 2 Replicas: 2,0,3 Isr: 2,0,3
Topic: multipletopic Partition: 2 Leader: 3 Replicas: 3,2,0 Isr: 3,2,0
(base) basanmachine:bin basan$

Observe the below folders multipletopic-0,multipletopic-1,multipletopic-2, since we have 3 replication


(base) basanmachine:bin basan$ cd /tmp/kafka-logs
(base) basanmachine:kafka-logs basan$ ls
cleaner-offset-checkpoint
log-start-offset-checkpoint
meta.properties
multipletopic-0
multipletopic-1
multipletopic-2
my_topic-0
recovery-point-offset-checkpoint
replication-offset-checkpoint
(base) basanmachine:kafka-logs basan$


(base) basanmachine:multipletopic-0 basan$ pwd
/tmp/kafka-logs/multipletopic-0
(base) basanmachine:multipletopic-0 basan$ ls
00000000000000000000.index 00000000000000000000.timeindex
00000000000000000000.log leader-epoch-checkpoint
(base) basanmachine:multipletopic-0 basan$


========================================================================================================

If we briing any broker in sync replica will come down which can be seen by checking describe command


(base) basanmachine:bin basan$ ./kafka-topics.sh --describe --topic multipletopic --zookeeper localhost:2181
Topic:multipletopic PartitionCount:3 ReplicationFactor:3 Configs:
Topic: multipletopic Partition: 0 Leader: 0 Replicas: 0,3,2 Isr: 0,2
Topic: multipletopic Partition: 1 Leader: 2 Replicas: 2,0,3 Isr: 2,0
Topic: multipletopic Partition: 2 Leader: 2 Replicas: 3,2,0 Isr: 2,0

    Now I will restart the broker again in sync replica will be changed.


    (base) basanmachine:bin basan$ ./kafka-server-start.sh ../config/server3.properties


(base) basanmachine:bin basan$ ./kafka-topics.sh --describe --topic multipletopic --zookeeper localhost:2181
Topic:multipletopic PartitionCount:3 ReplicationFactor:3 Configs:
Topic: multipletopic Partition: 0 Leader: 0 Replicas: 0,3,2 Isr: 0,2,3
Topic: multipletopic Partition: 1 Leader: 2 Replicas: 2,0,3 Isr: 2,0,3
Topic: multipletopic Partition: 2 Leader: 2 Replicas: 3,2,0 Isr: 2,0,3
(base) basanmachine:bin basan$

Observe again in sync replica came up again


========================================================================================================

/Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/bin

./kafka-topics.sh --create --topic basan --zookeeper localhost:2181 --replication-factor 1 --partitions 1

./kafka-topics.sh --list --zookeeper localhost:2181

./kafka-console-producer.sh --broker-list localhost:9092 --topic basan

./kafka-console-consumer.sh --zookeeper localhost:2181 --topic basan --from-beginning



(base) basanmachine:bin basan$ ./kafka-topics.sh --create --topic basan --zookeeper localhost:2181 --replication-factor 1 --partitions 1
Created topic "basan".
(base) basanmachine:bin basan$ ./kafka-topics.sh --list --zookeeper localhost:2181
basan
multipletopic
my_topic
(base) basanmachine:bin basan$


(base) basanmachine:bin basan$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic basan
>basan
test
test2


./kafka-console-consumer.sh --zookeeper localhost:2181 --topic basan --from-beginning



(base) basanmachine:~ basan$ cd /Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/bin
-bash: cd: /Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/bin: No such file or directory
(base) basanmachine:~ basan$ /Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/bin
-bash: /Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/bin: is a directory
(base) basanmachine:~ basan$ cd /Users/basan/Documents/HadoopTraining/Kafka/kafka_2.11-1.1.0/bin
(base) basanmachine:bin basan$
(base) basanmachine:bin basan$ ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic basan --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
basan
test
test2

Spark Dstream

We will be processing continuous messages like kafka or Keinesis
Streaming data will be convered to RDD, by defining microbatch interval.

Flink is true streaming solution.


DStream will consist of set of RDD. Spark performas operations on RDD.

messages at certain interval will be aggregated, to form RDD. Set of RDD will create Dstream.

Within Dstreams, spark does batch processing on individual RDDs.


For Dstream we need to have StreamingContext. StreamingContext is built using sparkContext and the timeinterval.


Socket = ip +port

=========================

Below command can be used to produce messges
nc -lk 9999



(base) basanmachine:SparkProjects basan$ nc -lk 9999
test
test2
basan
patil
dummy producer basan


(base) basanmachine:~ $ spark-shell --master local[2]


cala> (base) basanmachine:~ basan$ spark-shell --master local[2]
2019-12-07 15:26:24 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://localhost:4040
Spark context available as 'sc' (master = local[2], app id = local-1575712589217).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.3.2
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_191)
Type in expressions to have them evaluated.
Type :help for more information.

scala>


import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
val ssc= new StreamingContext(sc, Seconds(5))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word =>(word,1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()




scala> import org.apache.spark._
import org.apache.spark._

scala> import org.apache.spark.streaming._
import org.apache.spark.streaming._

scala> import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.StreamingContext._

scala> val ssc= new StreamingContext(sc, Seconds(5))
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@350342e0

scala> val lines = ssc.socketTextStream("localhost", 9999)
lines: org.apache.spark.streaming.dstream.ReceiverInputDStream[String] = org.apache.spark.streaming.dstream.SocketInputDStream@53079ae6

scala> val words = lines.flatMap(_.split(" "))
words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@423762ae

scala> val pairs = words.map(word =>(word,1))
pairs: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.MappedDStream@4e9695cf

scala> val wordCounts = pairs.reduceByKey(_ + _)
wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@2351255a

scala> wordCounts.print()

scala>



scala> ssc.start()

scala> 2019-12-07 15:37:12 WARN  RandomBlockReplicationPolicy:66 - Expecting 1 replicas with only 0 peer/s.
2019-12-07 15:37:12 WARN  BlockManager:66 - Block input-0-1575713232400 replicated to only 0 peer(s) instead of 1 peers
-------------------------------------------
Time: 1575713235000 ms
-------------------------------------------
(test2,1)
(test,1)
(patil,1)
(basan,1)

-------------------------------------------
Time: 1575713240000 ms
-------------------------------------------

-------------------------------------------
Time: 1575713245000 ms
-------------------------------------------

2019-12-07 15:37:25 WARN  RandomBlockReplicationPolicy:66 - Expecting 1 replicas with only 0 peer/s.
2019-12-07 15:37:25 WARN  BlockManager:66 - Block input-0-1575713245200 replicated to only 0 peer(s) instead of 1 peers
-------------------------------------------
Time: 1575713250000 ms
-------------------------------------------
(test2,1)
(3,1)
(tet,1)

[Stage 0:>                                                          (0 + 1) / 1]2019-12-07 15:37:34 WARN  RandomBlockReplicationPolicy:66 - Expecting 1 replicas with only 0 peer/s.
2019-12-07 15:37:34 WARN  BlockManager:66 - Block input-0-1575713253800 replicated to only 0 peer(s) instead of 1 peers
-------------------------------------------
Time: 1575713255000 ms
-------------------------------------------
(basana,1)

[Stage 0:>                                                          (0 + 1) / 1]2019-12-07 15:37:39 WARN  RandomBlockReplicationPolicy:66 - Expecting 1 replicas with only 0 peer/s.
2019-12-07 15:37:39 WARN  BlockManager:66 - Block input-0-1575713259600 replicated to only 0 peer(s) instead of 1 peers
-------------------------------------------
Time: 1575713260000 ms
-------------------------------------------
(patil,1)
(basan,1)

-------------------------------------------

==================================================================================================================
pushing messages from socket
(base) basanmachine:SparkProjects basan$ nc -lk 9999
basan patil
test test2
test2 tet 3
basana
patil basan


==================================================================================================================
Observe the results, it just takes the aggregation of 5 seconds and not worried of previous results. This is the behaviour
in stateless transformation.


stateless transformation : For each RDD operation was applied and ignores the prvious results.

==================================================================================================================
statefull transformation :
We can get details from the starting of stream, or we can get few RDDS by defining window.

updateStateByKey(updateFn) - is the stateful function.

Pair RDD : tuple of elements.

==================================================================================================================

Window Operations :
Worried of last certail time interval.

summary function - consdering new ocming RDD operation
Inverse function - subtracting outgoing RDD operation

checkpoint is needed to  track the changes.

countByTwo - is the stateful operation

line.countByTwo(10, 2)

window size is 10 and 2 is the sliding interval.
Every 2 sec it will remove one old rdd and adds new rdd


sliding interval : has to be multiple of the Streaming Context inteval as it cannot read half RDD.

==================================================================================================================

If the producer is much faster compared to consumer then the data loss can happem in socketstream. Thats where buffering
mechanism like kafka.
All streaming solutions uses kafka.

TwitterStream can be the example.

Saturday, November 30, 2019

Dataframe simple operations


package com.basan.day5.df

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.log4j._
import org.apache.spark.sql.Column

object LoggingWithCaseClass {

  case class Logging(level: String, datetime: String)

  def mapper(line: String): Logging = {
    val fields = line.split(",")
    val logging: Logging = Logging(fields(0), fields(1))
    logging
  }

  /** Our main function where the action happens */
  def main(args: Array[String]) {

    // Set the log level to only print errors
    Logger.getLogger("org").setLevel(Level.ERROR)

    // Use new SparkSession interface in Spark 2.0
    // Use new SparkSession interface in Spark 2.0
    val spark = SparkSession
      .builder
      .appName("SparkSQL")
      .master("local[*]")
      .config("spark.sql.warehouse.dir", "file:///C:/temp") // Necessary to work around a Windows bug in Spark 2.0.0; omit if you're not on Windows.
      .getOrCreate()

    import spark.implicits._

    var mylist = List(
      "WARN,2016-12-31 04:19:32",
      "FATAL,2016-12-31 03:19:32",
      "WARN,2016-12-31 04:19:32",
      "WARN,2016-12-31 04:19:31",
      "INFO,2016-12-31 04:19:32",
      "FATAL,2016-12-31 14:19:32")

    val rdd1 = spark.sparkContext.parallelize(mylist)
    val rdd2 = rdd1.map(mapper)

    val df1 = rdd2.toDF()
    df1.show()
    df1.createOrReplaceTempView("logging_table")

    spark.sql("Select * from logging_table ").show()
    //show(false) will display all the results
    spark.sql("Select level , collect_list(datetime) from logging_table group by level order by level").show(false)

    spark.sql("Select level , count(datetime) from logging_table group by level order by level").show(false)
    
    
    spark.sql("select level, datetime from logging_table").show() 
    
    
    

  }

}






Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
+-----+-------------------+
|level|           datetime|
+-----+-------------------+
| WARN|2016-12-31 04:19:32|
|FATAL|2016-12-31 03:19:32|
| WARN|2016-12-31 04:19:32|
| WARN|2016-12-31 04:19:31|
| INFO|2016-12-31 04:19:32|
|FATAL|2016-12-31 14:19:32|
+-----+-------------------+

+-----+-------------------+
|level|           datetime|
+-----+-------------------+
| WARN|2016-12-31 04:19:32|
|FATAL|2016-12-31 03:19:32|
| WARN|2016-12-31 04:19:32|
| WARN|2016-12-31 04:19:31|
| INFO|2016-12-31 04:19:32|
|FATAL|2016-12-31 14:19:32|
+-----+-------------------+

+-----+---------------------------------------------------------------+
|level|collect_list(datetime)                                         |
+-----+---------------------------------------------------------------+
|FATAL|[2016-12-31 03:19:32, 2016-12-31 14:19:32]                     |
|INFO |[2016-12-31 04:19:32]                                          |
|WARN |[2016-12-31 04:19:32, 2016-12-31 04:19:32, 2016-12-31 04:19:31]|
+-----+---------------------------------------------------------------+

+-----+---------------+
|level|count(datetime)|
+-----+---------------+
|FATAL|2              |
|INFO |1              |
|WARN |3              |
+-----+---------------+

+-----+-------------------+
|level|           datetime|
+-----+-------------------+
| WARN|2016-12-31 04:19:32|
|FATAL|2016-12-31 03:19:32|
| WARN|2016-12-31 04:19:32|
| WARN|2016-12-31 04:19:31|
| INFO|2016-12-31 04:19:32|
|FATAL|2016-12-31 14:19:32|
+-----+-------------------+


Friday, November 29, 2019

Sqoop basics

sqoop

When we connect to hadoop, we connect to edge node and that internally
takes care of executing stuff in the cluster.

connecting to mysql from hadoop

sqoop-list-databases \
--connect "jdbc:mysql://quickstart.cloudera:3306" \
--username retail_dba \
--password cloudera

[cloudera@quickstart ~]$ hdfs dfs -ls
Found 1 items
drwxr-xr-x   - cloudera cloudera          0 2019-11-03 03:58 _sqoop
[cloudera@quickstart ~]$ sqoop-list-databases \
> --connect "jdbc:mysql://quickstart.cloudera:3306" \
> --username retail_dba \
> --password cloudera
Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
19/11/29 21:49:45 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.13.0
19/11/29 21:49:45 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
19/11/29 21:49:45 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
information_schema
retail_db
[cloudera@quickstart ~]$



now try to see the tables using the root user to see all tables listed.


sqoop-list-tables \
--connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
--username retail_dba \
--password cloudera


[cloudera@quickstart ~]$ sqoop-list-tables \
> --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
> --username retail_dba \
> --password cloudera
Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
19/11/29 21:56:50 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.13.0
19/11/29 21:56:50 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
19/11/29 21:56:50 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
categories
customers
departments
order_items
orders
products
[cloudera@quickstart ~]$




sqoop-eval \
--connect "jdbc:mysql://quickstart.cloudera:3306" \
--username retail_dba \
--password cloudera \
--query "select * from retail_db.customers limit 10"

https://sqoop.apache.org/docs/1.4.4/SqoopUserGuide.html#_controlling_the_import_process

sqoop import \
-Dgapreduce.job.queuename=${job_queue_nm} \
-Dmapreduce.job.queuename=${job_queue_nm} \
-Dmapred.job.queuename=${job_queue_nm} \
--options-file ${goa_lkup_dir}/connect.txt \
--delete-target-dir \
--target-dir ${landingdir}/order_node_allocation_reason_e \
--hive-drop-import-delims \
--query "select nearest_node_json from order_node_allocation_reason \
where insert_time >= '$strt_ts'  and  \$CONDITIONS" \
--null-string '\\N' --null-non-string '\\N' \
--num-mappers ${nummapper} \
--fields-terminated-by '|' \
--lines-terminated-by '\n'



create a table in sql and insert some data
CREATE TABLE people (PersonID int,LastName varchar(255),FirstName varchar(255),Address varchar(255),City varchar(255));

insert into people values(1,'patil', 'basan', 'address','bangalore');
insert into people values(2,'patil2', 'basan2', 'address2','bangalore2');
insert into people values(3,'patil3', 'basan3', 'address3','bangalore3');

commit;

mysql> insert into people values(1,'patil', 'basan', 'address','bangalore');
Query OK, 1 row affected (0.01 sec)

mysql> commit
    -> ;
Query OK, 0 rows affected (0.00 sec)

mysql>


Data can be sqooped without primary key but we will get only one mapper
.
If the output of the sqoop already exists then it will throw error


sqoop import \
--connect jdbc:mysql://quickstart.cloudera:3306/retail_db \
--username root \
--password cloudera \
--table people \
--target-dir /queryresult


[cloudera@quickstart ~]$ sqoop import \
> --connect jdbc:mysql://quickstart.cloudera:3306/retail_db \
> --username root \
> --password cloudera \
> --table people \
> --target-dir /queryresult
Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
19/11/29 22:15:26 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.13.0
19/11/29 22:15:26 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
19/11/29 22:15:26 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
19/11/29 22:15:26 INFO tool.CodeGenTool: Beginning code generation
19/11/29 22:15:26 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `people` AS t LIMIT 1
19/11/29 22:15:26 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `people` AS t LIMIT 1
19/11/29 22:15:26 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/lib/hadoop-mapreduce
Note: /tmp/sqoop-cloudera/compile/22a1f58b27b214b8bc8a6b8727c10967/people.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
19/11/29 22:15:28 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-cloudera/compile/22a1f58b27b214b8bc8a6b8727c10967/people.jar
19/11/29 22:15:28 WARN manager.MySQLManager: It looks like you are importing from mysql.
19/11/29 22:15:28 WARN manager.MySQLManager: This transfer can be faster! Use the --direct
19/11/29 22:15:28 WARN manager.MySQLManager: option to exercise a MySQL-specific fast path.
19/11/29 22:15:28 INFO manager.MySQLManager: Setting zero DATETIME behavior to convertToNull (mysql)
19/11/29 22:15:28 ERROR tool.ImportTool: Import failed: No primary key could be found for table people. Please specify one with --split-by or perform a sequential import with '-m 1'.
[cloudera@quickstart ~]$


hdfs dfs -ls /queryresult


To fix this pass -m =1
sqoop import \
--connect jdbc:mysql://quickstart.cloudera:3306/retail_db \
--username root \
--password cloudera \
--table people \
-m 1 \
--target-dir /queryresult



[cloudera@quickstart ~]$ sqoop import \
> --connect jdbc:mysql://quickstart.cloudera:3306/retail_db \
> --username root \
> --password cloudera \
> --table people \
> -m 1 \
> --target-dir /queryresult
Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
19/11/29 22:17:58 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.13.0
19/11/29 22:17:58 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
19/11/29 22:17:58 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
19/11/29 22:17:58 INFO tool.CodeGenTool: Beginning code generation
19/11/29 22:17:59 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `people` AS t LIMIT 1
19/11/29 22:17:59 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `people` AS t LIMIT 1
19/11/29 22:17:59 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/lib/hadoop-mapreduce
Note: /tmp/sqoop-cloudera/compile/aea02172d63d47af0d50fec188aa6c21/people.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
19/11/29 22:18:00 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-cloudera/compile/aea02172d63d47af0d50fec188aa6c21/people.jar
19/11/29 22:18:00 WARN manager.MySQLManager: It looks like you are importing from mysql.
19/11/29 22:18:00 WARN manager.MySQLManager: This transfer can be faster! Use the --direct
19/11/29 22:18:00 WARN manager.MySQLManager: option to exercise a MySQL-specific fast path.
19/11/29 22:18:00 INFO manager.MySQLManager: Setting zero DATETIME behavior to convertToNull (mysql)
19/11/29 22:18:00 INFO mapreduce.ImportJobBase: Beginning import of people
19/11/29 22:18:00 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
19/11/29 22:18:00 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
19/11/29 22:18:01 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
19/11/29 22:18:01 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/11/29 22:18:03 INFO db.DBInputFormat: Using read commited transaction isolation
19/11/29 22:18:03 INFO mapreduce.JobSubmitter: number of splits:1
19/11/29 22:18:03 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1572771724749_0035
19/11/29 22:18:04 INFO impl.YarnClientImpl: Submitted application application_1572771724749_0035
19/11/29 22:18:04 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1572771724749_0035/
19/11/29 22:18:04 INFO mapreduce.Job: Running job: job_1572771724749_0035
19/11/29 22:18:10 INFO mapreduce.Job: Job job_1572771724749_0035 running in uber mode : false
19/11/29 22:18:10 INFO mapreduce.Job:  map 0% reduce 0%
19/11/29 22:18:15 INFO mapreduce.Job:  map 100% reduce 0%
19/11/29 22:18:15 INFO mapreduce.Job: Job job_1572771724749_0035 completed successfully
19/11/29 22:18:15 INFO mapreduce.Job: Counters: 30
File System Counters
FILE: Number of bytes read=0
FILE: Number of bytes written=171167
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=87
HDFS: Number of bytes written=104
HDFS: Number of read operations=4
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Other local map tasks=1
Total time spent by all maps in occupied slots (ms)=3008
Total time spent by all reduces in occupied slots (ms)=0
Total time spent by all map tasks (ms)=3008
Total vcore-milliseconds taken by all map tasks=3008
Total megabyte-milliseconds taken by all map tasks=3080192
Map-Reduce Framework
Map input records=3
Map output records=3
Input split bytes=87
Spilled Records=0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=32
CPU time spent (ms)=850
Physical memory (bytes) snapshot=215085056
Virtual memory (bytes) snapshot=1573986304
Total committed heap usage (bytes)=235929600
File Input Format Counters
Bytes Read=0
File Output Format Counters
Bytes Written=104
19/11/29 22:18:15 INFO mapreduce.ImportJobBase: Transferred 104 bytes in 14.0751 seconds (7.3889 bytes/sec)
19/11/29 22:18:15 INFO mapreduce.ImportJobBase: Retrieved 3 records.
[cloudera@quickstart ~]$ hdfs dfs -ls /queryresultFound 2 items
-rw-r--r--   1 cloudera supergroup          0 2019-11-29 22:18 /queryresult/_SUCCESS
-rw-r--r--   1 cloudera supergroup        104 2019-11-29 22:18 /queryresult/part-m-00000
[cloudera@quickstart ~]$

By default 4 mappers will be used. But if the table is not having key
then 1 mapper will be used. SO we see one file as the output.


sqoop import \
--connect jdbc:mysql://quickstart.cloudera:3306/retail_db \
--username root \
--password cloudera \
--table orders \
--target-dir /queryresultorders

hdfs dfs -ls /queryresultorders
hdfs dfs -cat /queryresultorders/*



[cloudera@quickstart ~]$ sqoop import \
> --connect jdbc:mysql://quickstart.cloudera:3306/retail_db \
> --username root \
> --password cloudera \
> --table orders \
> --target-dir /queryresultorders
Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
19/11/29 22:21:41 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.13.0
19/11/29 22:21:41 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
19/11/29 22:21:41 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
19/11/29 22:21:41 INFO tool.CodeGenTool: Beginning code generation
19/11/29 22:21:41 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `orders` AS t LIMIT 1
19/11/29 22:21:41 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `orders` AS t LIMIT 1
19/11/29 22:21:41 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/lib/hadoop-mapreduce
Note: /tmp/sqoop-cloudera/compile/4038ef3cab402a8d089b3d928036f385/orders.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
19/11/29 22:21:43 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-cloudera/compile/4038ef3cab402a8d089b3d928036f385/orders.jar
19/11/29 22:21:43 WARN manager.MySQLManager: It looks like you are importing from mysql.
19/11/29 22:21:43 WARN manager.MySQLManager: This transfer can be faster! Use the --direct
19/11/29 22:21:43 WARN manager.MySQLManager: option to exercise a MySQL-specific fast path.
19/11/29 22:21:43 INFO manager.MySQLManager: Setting zero DATETIME behavior to convertToNull (mysql)
19/11/29 22:21:43 INFO mapreduce.ImportJobBase: Beginning import of orders
19/11/29 22:21:43 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
19/11/29 22:21:43 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
19/11/29 22:21:43 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
19/11/29 22:21:44 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/11/29 22:21:45 INFO db.DBInputFormat: Using read commited transaction isolation
19/11/29 22:21:45 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN(`order_id`), MAX(`order_id`) FROM `orders`
19/11/29 22:21:45 INFO db.IntegerSplitter: Split size: 17220; Num splits: 4 from: 1 to: 68883
19/11/29 22:21:45 INFO mapreduce.JobSubmitter: number of splits:4
19/11/29 22:21:45 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1572771724749_0036
19/11/29 22:21:45 INFO impl.YarnClientImpl: Submitted application application_1572771724749_0036
19/11/29 22:21:46 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1572771724749_0036/
19/11/29 22:21:46 INFO mapreduce.Job: Running job: job_1572771724749_0036
19/11/29 22:21:52 INFO mapreduce.Job: Job job_1572771724749_0036 running in uber mode : false
19/11/29 22:21:52 INFO mapreduce.Job:  map 0% reduce 0%
19/11/29 22:22:02 INFO mapreduce.Job:  map 25% reduce 0%
19/11/29 22:22:05 INFO mapreduce.Job:  map 50% reduce 0%
19/11/29 22:22:06 INFO mapreduce.Job:  map 75% reduce 0%
19/11/29 22:22:07 INFO mapreduce.Job:  map 100% reduce 0%
19/11/29 22:22:07 INFO mapreduce.Job: Job job_1572771724749_0036 completed successfully
19/11/29 22:22:07 INFO mapreduce.Job: Counters: 30
File System Counters
FILE: Number of bytes read=0
FILE: Number of bytes written=685420
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=469
HDFS: Number of bytes written=2999944
HDFS: Number of read operations=16
HDFS: Number of large read operations=0
HDFS: Number of write operations=8
Job Counters
Launched map tasks=4
Other local map tasks=4
Total time spent by all maps in occupied slots (ms)=36317
Total time spent by all reduces in occupied slots (ms)=0
Total time spent by all map tasks (ms)=36317
Total vcore-milliseconds taken by all map tasks=36317
Total megabyte-milliseconds taken by all map tasks=37188608
Map-Reduce Framework
Map input records=68883
Map output records=68883
Input split bytes=469
Spilled Records=0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=335
CPU time spent (ms)=13480
Physical memory (bytes) snapshot=1053622272
Virtual memory (bytes) snapshot=6251585536
Total committed heap usage (bytes)=945291264
File Input Format Counters
Bytes Read=0
File Output Format Counters
Bytes Written=2999944
19/11/29 22:22:07 INFO mapreduce.ImportJobBase: Transferred 2.861 MB in 23.5627 seconds (124.3332 KB/sec)
19/11/29 22:22:07 INFO mapreduce.ImportJobBase: Retrieved 68883 records.
[cloudera@quickstart ~]$ hdfs dfs -ls /queryresultorders
Found 5 items
-rw-r--r--   1 cloudera supergroup          0 2019-11-29 22:22 /queryresultorders/_SUCCESS
-rw-r--r--   1 cloudera supergroup     741614 2019-11-29 22:22 /queryresultorders/part-m-00000
-rw-r--r--   1 cloudera supergroup     753022 2019-11-29 22:22 /queryresultorders/part-m-00001
-rw-r--r--   1 cloudera supergroup     752368 2019-11-29 22:22 /queryresultorders/part-m-00002
-rw-r--r--   1 cloudera supergroup     752940 2019-11-29 22:22 /queryresultorders/part-m-00003
[cloudera@quickstart ~]$

Observer by default 4 mappers got created.



hdfs dfs -cat /queryresultorders/*

If the table is huge and having no key single mapper will not work. We need to optimize.


Bringing all of the tables

sqoop-import-all-tables \
--connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
--username retail_dba \
--password cloudera \
--as-sequencefile \
-m 4 \
--warehouse-dir /user/cloudera/sqoopdir

We can mention file formats while sqooping data.
 By default it uses text file formats
 sequence file format is supported.
 avro file format
 parquet file format

 Orc is not supported.


 warehouse-dir and target-dir?
 warehouse-dir : With the name of the table sub directory will be created. When we are importing multiple tables
 we have to use warehouse-dir, target-dir cannot be used.




 target-dir :



sqoop import \
--connect jdbc:mysql://quickstart.cloudera:3306/retail_db \
--username root \
--password cloudera \
--table orders \
--warehouse-dir /orderswithwarehouse


hdfs dfs -ls /orderswithwarehouse


[cloudera@quickstart ~]$ sqoop import \
> --connect jdbc:mysql://quickstart.cloudera:3306/retail_db \
> --username root \
> --password cloudera \
> --table orders \
> --warehouse-dir /orderswithwarehouse
Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
19/11/29 22:34:23 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.13.0
19/11/29 22:34:23 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
19/11/29 22:34:24 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
19/11/29 22:34:24 INFO tool.CodeGenTool: Beginning code generation
19/11/29 22:34:24 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `orders` AS t LIMIT 1
19/11/29 22:34:24 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `orders` AS t LIMIT 1
19/11/29 22:34:24 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/lib/hadoop-mapreduce
Note: /tmp/sqoop-cloudera/compile/2c43aaa902ea582daec0aa0cd99368a7/orders.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
19/11/29 22:34:25 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-cloudera/compile/2c43aaa902ea582daec0aa0cd99368a7/orders.jar
19/11/29 22:34:25 WARN manager.MySQLManager: It looks like you are importing from mysql.
19/11/29 22:34:25 WARN manager.MySQLManager: This transfer can be faster! Use the --direct
19/11/29 22:34:25 WARN manager.MySQLManager: option to exercise a MySQL-specific fast path.
19/11/29 22:34:25 INFO manager.MySQLManager: Setting zero DATETIME behavior to convertToNull (mysql)
19/11/29 22:34:25 INFO mapreduce.ImportJobBase: Beginning import of orders
19/11/29 22:34:25 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
19/11/29 22:34:26 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
19/11/29 22:34:26 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
19/11/29 22:34:27 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/11/29 22:34:28 INFO db.DBInputFormat: Using read commited transaction isolation
19/11/29 22:34:28 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN(`order_id`), MAX(`order_id`) FROM `orders`
19/11/29 22:34:28 INFO db.IntegerSplitter: Split size: 17220; Num splits: 4 from: 1 to: 68883
19/11/29 22:34:28 INFO mapreduce.JobSubmitter: number of splits:4
19/11/29 22:34:28 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1572771724749_0037
19/11/29 22:34:28 INFO impl.YarnClientImpl: Submitted application application_1572771724749_0037
19/11/29 22:34:28 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1572771724749_0037/
19/11/29 22:34:28 INFO mapreduce.Job: Running job: job_1572771724749_0037
19/11/29 22:34:36 INFO mapreduce.Job: Job job_1572771724749_0037 running in uber mode : false
19/11/29 22:34:36 INFO mapreduce.Job:  map 0% reduce 0%
19/11/29 22:34:47 INFO mapreduce.Job:  map 25% reduce 0%
19/11/29 22:34:50 INFO mapreduce.Job:  map 50% reduce 0%
19/11/29 22:34:51 INFO mapreduce.Job:  map 100% reduce 0%
19/11/29 22:34:52 INFO mapreduce.Job: Job job_1572771724749_0037 completed successfully
19/11/29 22:34:52 INFO mapreduce.Job: Counters: 31
File System Counters
FILE: Number of bytes read=0
FILE: Number of bytes written=685476
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=469
HDFS: Number of bytes written=2999944
HDFS: Number of read operations=16
HDFS: Number of large read operations=0
HDFS: Number of write operations=8
Job Counters
Killed map tasks=1
Launched map tasks=4
Other local map tasks=4
Total time spent by all maps in occupied slots (ms)=38440
Total time spent by all reduces in occupied slots (ms)=0
Total time spent by all map tasks (ms)=38440
Total vcore-milliseconds taken by all map tasks=38440
Total megabyte-milliseconds taken by all map tasks=39362560
Map-Reduce Framework
Map input records=68883
Map output records=68883
Input split bytes=469
Spilled Records=0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=424
CPU time spent (ms)=13190
Physical memory (bytes) snapshot=989405184
Virtual memory (bytes) snapshot=6315798528
Total committed heap usage (bytes)=946339840
File Input Format Counters
Bytes Read=0
File Output Format Counters
Bytes Written=2999944
19/11/29 22:34:52 INFO mapreduce.ImportJobBase: Transferred 2.861 MB in 25.5098 seconds (114.8435 KB/sec)
19/11/29 22:34:52 INFO mapreduce.ImportJobBase: Retrieved 68883 records.
[cloudera@quickstart ~]$ hdfs dfs -ls /orderswithwarehouse
Found 1 items
drwxr-xr-x   - cloudera supergroup          0 2019-11-29 22:34 /orderswithwarehouse/orders
[cloudera@quickstart ~]$ hdfs dfs -ls /orderswithwarehouse/orders
Found 5 items
-rw-r--r--   1 cloudera supergroup          0 2019-11-29 22:34 /orderswithwarehouse/orders/_SUCCESS
-rw-r--r--   1 cloudera supergroup     741614 2019-11-29 22:34 /orderswithwarehouse/orders/part-m-00000
-rw-r--r--   1 cloudera supergroup     753022 2019-11-29 22:34 /orderswithwarehouse/orders/part-m-00001
-rw-r--r--   1 cloudera supergroup     752368 2019-11-29 22:34 /orderswithwarehouse/orders/part-m-00002
-rw-r--r--   1 cloudera supergroup     752940 2019-11-29 22:34 /orderswithwarehouse/orders/part-m-00003
[cloudera@quickstart ~]$



Observe [cloudera@quickstart ~]$ hdfs dfs -ls /orderswithwarehouse/orders sub folder been created for the config warehouse-dir

==============================================================================================
#getting help of all

sqoop help
sqoop version

[cloudera@quickstart ~]$ sqoop version
Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
19/11/29 22:42:21 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.13.0
Sqoop 1.4.6-cdh5.13.0
git commit id
Compiled by jenkins on Wed Oct  4 11:04:44 PDT 2017
[cloudera@quickstart ~]$


sqoop help eval

#getting help of import
sqoop help import

==============================================================================================

--password we need to enter password in plain text

-P it will prompt for password


sqoop-list-databases \
--connect "jdbc:mysql://quickstart.cloudera:3306" \
--username retail_dba \
-P


[cloudera@quickstart ~]$ sqoop-list-databases \
> --connect "jdbc:mysql://quickstart.cloudera:3306" \
> --username retail_dba \
> -P
Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
19/11/29 22:43:51 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.13.0
Enter password:
19/11/29 22:43:55 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
information_schema
retail_db
[cloudera@quickstart ~]$

Observer it expects Enter password: to be entered

==============================================================================================


sqoop eval command -e --query

-e or --query are same
-m or -nummapper are same

==============================================================================================

redirecting logs - stdout stderr

1 is the code for stdout

2 is the code for stderr

1>query.out 2>query.err

When we automate and see the logs this is needed

sqoop import \
--connect jdbc:mysql://quickstart.cloudera:3306/retail_db \
--username root \
--password cloudera \
--table orders \
--target-dir /queryresulttargetdir-err 1>query.out 2>query.err



[cloudera@quickstart ~]$ sqoop import \
> --connect jdbc:mysql://quickstart.cloudera:3306/retail_db \
> --username root \
> --password cloudera \
> --table orders \
> --target-dir /queryresulttargetdir-err 1>query.out 2>query.err
;
[cloudera@quickstart ~]$ ;
bash: syntax error near unexpected token `;'
[cloudera@quickstart ~]$ ls
cloudera-manager  Desktop    Downloads  enterprise-deployment.json  kerberos  Music        parcels      Pictures  query.err  Templates  workspace
cm_api.py         Documents  eclipse    express-deployment.json     lib       orders.java  people.java  Public    query.out  Videos
[cloudera@quickstart ~]$ vi query.out
[cloudera@quickstart ~]$ vi query.err
[cloudera@quickstart ~]$ ls query.out
query.out
[cloudera@quickstart ~]$ ls query.err
query.err
[cloudera@quickstart ~]$


Observer the file query.out  and query.err  created

==============================================================================================
sqoop import

target-dir

warehouse-dir

--append

--delete-target-dir

==============================================================================================

If there is no primary key we will see how it can be optimized.
==============================================================================================

What is bounding val query?

When there is no primary key we have to specify mapper as 1 or specify split-by

Use sqoop with the column which is of integral type.


==============================================================================================

file formats

--as-avrodatafile
--as-sequencefile
--as-parquetfile
==============================================================================================


by default --compression is gz compression.

If we specify the specific compression
specify --compression-codec  can pass SnappyCodec

-compress or -z (default compression is gzip and we will see .gz extension)

--compression-codec (specify the compression algorithm)

/etc/hadoop/conf core-site.xml

--compress
--compression-codec org.apache.hadoop.io.compress.SnappyCodec

==============================================================================================
















Sunday, November 24, 2019

Dataframe code with take and show methods


package com.basan.day4.df

import org.apache.log4j._
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.SparkSession

/**
 * 0 Will 33 385
 * 1 Jean-Luc 26 2
 * 2 Hugh 55 221
 * 3 Deanna 40 465
 * 4 Quark 68 21
 * 5 Weyoun 59 318
 * 6 Gowron 37 220
 * 7 Will 54 307*
 */

object DataFrameExampleSQL {

  case class Person(ID: Int, name: String, age: Int, numFriends: Int)

  def mapper(line: String): Person = {
    val fields = line.split(",")
    val person: Person = Person(fields(0).toInt, fields(1), fields(2).toInt, fields(3).toInt)
    person
  }

  def main(args: Array[String]) {

    Logger.getLogger("org").setLevel(Level.ERROR)

    val spark = SparkSession.builder
      .appName("Data frame test using SQL")
      .master("local[*]")
      //needed only in windows check it
      // .config("spark.sql.warehouse.dir", "/Users/basan/Documents/Spark/tempwarehouse")
      // .config("spark.driver.allowMultipleContexts", "true")
      .getOrCreate()

    // val sc = new SparkContext("local[*]", "ERROR count with bigFile")

    var lines = spark.sparkContext.textFile("/Users/basan/workspace-spark/sparkDemo1/spark-data/friends-data.csv")
    import spark.implicits._
    val schemaPeople = lines.map(mapper).toDS()

    schemaPeople.createOrReplaceTempView("people")

    //SQL on the dataframe

    val teenagers = spark.sql("SELECT * from people where age>=13 AND age<=19")
        print("show-----")

    teenagers.show(2)
    print("take-----")
    teenagers.take(2)

    val results = teenagers.collect()

    results.foreach(println)

    scala.io.StdIn.readLine()

    spark.stop()

  }
 
 
  //look into the spark UI and SQL tab observe WholeStageCodegen block is generated
  //Check the plan to see how queries gets executed and tuning can be done using [plans]
 
 

}

DataFrame Sample code

object DataFrameExample {

  case class Person(ID: Int, name: String, age: Int, numFriends: Int)

  def mapper(line: String): Person = {
    val fields = line.split(",")
    val person: Person = Person(fields(0).toInt, fields(1), fields(2).toInt, fields(3).toInt)
    person
  }

  def main(args: Array[String]) {

    Logger.getLogger("org").setLevel(Level.ERROR)

    val spark = SparkSession.builder
      .appName("Data frame test")
      .master("local[*]")
      //needed only in windows check it
      // .config("spark.sql.warehouse.dir", "/Users/basan/Documents/Spark/tempwarehouse")
      //.config("spark.driver.allowMultipleContexts", "true")
      .getOrCreate()

    // val sc = new SparkContext("local[*]", "ERROR count with bigFile")

    var lines = spark.sparkContext.textFile("/Users/basan/workspace-spark/sparkDemo1/spark-data/friends-data.csv")
    import spark.implicits._
    val people = lines.map(mapper).toDS().cache()

    println("Inferred schema")
    people.printSchema()

    println("select the name column")
    people.select("name").show()

    println("Filter out anyone over 21")
    people.filter(people("age") < 21).show()

    println("group by age")
    people.groupBy("age").count().show()

    println("make everyone 10 years older")
    people.select(people("name"), (people("age") + 10)).show()
    people.select(people("name"), ((people("age") + 10)).alias("newcolumn")).show()


    spark.stop()

  }

}

Spark Submit - logs

cbc32a57245:bin z002qhl$ ./spark-submit --class com.basan.day3.ErrorWarnCount /Users/z002qhl/Desktop/sparkDemo.jar
2019-11-24 16:05:37 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2019-11-24 16:05:37 INFO  SparkContext:54 - Running Spark version 2.3.2
2019-11-24 16:05:37 INFO  SparkContext:54 - Submitted application: ERROR count
2019-11-24 16:05:37 INFO  SecurityManager:54 - Changing view acls to: z002qhl
2019-11-24 16:05:37 INFO  SecurityManager:54 - Changing modify acls to: z002qhl
2019-11-24 16:05:37 INFO  SecurityManager:54 - Changing view acls groups to:
2019-11-24 16:05:37 INFO  SecurityManager:54 - Changing modify acls groups to:
2019-11-24 16:05:37 INFO  SecurityManager:54 - SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(z002qhl); groups with view permissions: Set(); users  with modify permissions: Set(z002qhl); groups with modify permissions: Set()
2019-11-24 16:05:38 INFO  Utils:54 - Successfully started service 'sparkDriver' on port 49223.
2019-11-24 16:05:38 INFO  SparkEnv:54 - Registering MapOutputTracker
2019-11-24 16:05:38 INFO  SparkEnv:54 - Registering BlockManagerMaster
2019-11-24 16:05:38 INFO  BlockManagerMasterEndpoint:54 - Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
2019-11-24 16:05:38 INFO  BlockManagerMasterEndpoint:54 - BlockManagerMasterEndpoint up
2019-11-24 16:05:38 INFO  DiskBlockManager:54 - Created local directory at /private/var/folders/yh/xllvl9pd01g_skzq6_wg_3cst3pfgf/T/blockmgr-4d4dad0c-2af2-4bed-9246-1b3f1df4b40e
2019-11-24 16:05:38 INFO  MemoryStore:54 - MemoryStore started with capacity 366.3 MB
2019-11-24 16:05:38 INFO  SparkEnv:54 - Registering OutputCommitCoordinator
2019-11-24 16:05:38 INFO  log:192 - Logging initialized @31818ms
2019-11-24 16:05:38 INFO  Server:351 - jetty-9.3.z-SNAPSHOT, build timestamp: unknown, git hash: unknown
2019-11-24 16:05:38 INFO  Server:419 - Started @31906ms
2019-11-24 16:05:38 INFO  AbstractConnector:278 - Started ServerConnector@263f04ca{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
2019-11-24 16:05:38 INFO  Utils:54 - Successfully started service 'SparkUI' on port 4040.
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@2555fff0{/jobs,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@15bcf458{/jobs/json,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@5af9926a{/jobs/job,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@fac80{/jobs/job/json,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@726386ed{/stages,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@649f2009{/stages/json,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@14bb2297{/stages/stage,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@1a15b789{/stages/stage/json,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@57f791c6{/stages/pool,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@51650883{/stages/pool/json,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@6c4f9535{/storage,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@5bd1ceca{/storage/json,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@30c31dd7{/storage/rdd,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@499b2a5c{/storage/rdd/json,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@596df867{/environment,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@c1fca1e{/environment/json,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@241a53ef{/executors,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@344344fa{/executors/json,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@2db2cd5{/executors/threadDump,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@70e659aa{/executors/threadDump/json,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@615f972{/static,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@70e0accd{/,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@7957dc72{/api,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@82c57b3{/jobs/job/kill,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@5be82d43{/stages/stage/kill,null,AVAILABLE,@Spark}
2019-11-24 16:05:38 INFO  SparkUI:54 - Bound SparkUI to 0.0.0.0, and started at http://192.168.20.34:4040
2019-11-24 16:05:38 INFO  SparkContext:54 - Added JAR file:/Users/z002qhl/Desktop/sparkDemo.jar at spark://192.168.20.34:49223/jars/sparkDemo.jar with timestamp 1574591738675
2019-11-24 16:05:38 INFO  Executor:54 - Starting executor ID driver on host localhost
2019-11-24 16:05:38 INFO  Utils:54 - Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 49224.
2019-11-24 16:05:38 INFO  NettyBlockTransferService:54 - Server created on 192.168.20.34:49224
2019-11-24 16:05:38 INFO  BlockManager:54 - Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
2019-11-24 16:05:38 INFO  BlockManagerMaster:54 - Registering BlockManager BlockManagerId(driver, 192.168.20.34, 49224, None)
2019-11-24 16:05:38 INFO  BlockManagerMasterEndpoint:54 - Registering block manager 192.168.20.34:49224 with 366.3 MB RAM, BlockManagerId(driver, 192.168.20.34, 49224, None)
2019-11-24 16:05:38 INFO  BlockManagerMaster:54 - Registered BlockManager BlockManagerId(driver, 192.168.20.34, 49224, None)
2019-11-24 16:05:38 INFO  BlockManager:54 - Initialized BlockManager: BlockManagerId(driver, 192.168.20.34, 49224, None)
2019-11-24 16:05:38 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@3703bf3c{/metrics/json,null,AVAILABLE,@Spark}
2019-11-24 16:05:39 INFO  SparkContext:54 - Starting job: collect at ErrorWarnCount.scala:32
2019-11-24 16:05:39 INFO  DAGScheduler:54 - Registering RDD 1 (map at ErrorWarnCount.scala:23)
2019-11-24 16:05:39 INFO  DAGScheduler:54 - Got job 0 (collect at ErrorWarnCount.scala:32) with 8 output partitions
2019-11-24 16:05:39 INFO  DAGScheduler:54 - Final stage: ResultStage 1 (collect at ErrorWarnCount.scala:32)
2019-11-24 16:05:39 INFO  DAGScheduler:54 - Parents of final stage: List(ShuffleMapStage 0)
2019-11-24 16:05:39 INFO  DAGScheduler:54 - Missing parents: List(ShuffleMapStage 0)
2019-11-24 16:05:39 INFO  DAGScheduler:54 - Submitting ShuffleMapStage 0 (MapPartitionsRDD[1] at map at ErrorWarnCount.scala:23), which has no missing parents
2019-11-24 16:05:39 INFO  MemoryStore:54 - Block broadcast_0 stored as values in memory (estimated size 3.1 KB, free 366.3 MB)
2019-11-24 16:05:39 INFO  MemoryStore:54 - Block broadcast_0_piece0 stored as bytes in memory (estimated size 2008.0 B, free 366.3 MB)
2019-11-24 16:05:39 INFO  BlockManagerInfo:54 - Added broadcast_0_piece0 in memory on 192.168.20.34:49224 (size: 2008.0 B, free: 366.3 MB)
2019-11-24 16:05:39 INFO  SparkContext:54 - Created broadcast 0 from broadcast at DAGScheduler.scala:1039
2019-11-24 16:05:39 INFO  DAGScheduler:54 - Submitting 8 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[1] at map at ErrorWarnCount.scala:23) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7))
2019-11-24 16:05:39 INFO  TaskSchedulerImpl:54 - Adding task set 0.0 with 8 tasks
2019-11-24 16:05:39 INFO  TaskSetManager:54 - Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 7844 bytes)
2019-11-24 16:05:39 INFO  TaskSetManager:54 - Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, PROCESS_LOCAL, 7878 bytes)
2019-11-24 16:05:39 INFO  TaskSetManager:54 - Starting task 2.0 in stage 0.0 (TID 2, localhost, executor driver, partition 2, PROCESS_LOCAL, 7844 bytes)
2019-11-24 16:05:39 INFO  TaskSetManager:54 - Starting task 3.0 in stage 0.0 (TID 3, localhost, executor driver, partition 3, PROCESS_LOCAL, 7879 bytes)
2019-11-24 16:05:39 INFO  TaskSetManager:54 - Starting task 4.0 in stage 0.0 (TID 4, localhost, executor driver, partition 4, PROCESS_LOCAL, 7844 bytes)
2019-11-24 16:05:39 INFO  TaskSetManager:54 - Starting task 5.0 in stage 0.0 (TID 5, localhost, executor driver, partition 5, PROCESS_LOCAL, 7879 bytes)
2019-11-24 16:05:39 INFO  TaskSetManager:54 - Starting task 6.0 in stage 0.0 (TID 6, localhost, executor driver, partition 6, PROCESS_LOCAL, 7844 bytes)
2019-11-24 16:05:39 INFO  TaskSetManager:54 - Starting task 7.0 in stage 0.0 (TID 7, localhost, executor driver, partition 7, PROCESS_LOCAL, 7879 bytes)
2019-11-24 16:05:39 INFO  Executor:54 - Running task 5.0 in stage 0.0 (TID 5)
2019-11-24 16:05:39 INFO  Executor:54 - Running task 7.0 in stage 0.0 (TID 7)
2019-11-24 16:05:39 INFO  Executor:54 - Running task 3.0 in stage 0.0 (TID 3)
2019-11-24 16:05:39 INFO  Executor:54 - Running task 6.0 in stage 0.0 (TID 6)
2019-11-24 16:05:39 INFO  Executor:54 - Running task 0.0 in stage 0.0 (TID 0)
2019-11-24 16:05:39 INFO  Executor:54 - Running task 4.0 in stage 0.0 (TID 4)
2019-11-24 16:05:39 INFO  Executor:54 - Running task 2.0 in stage 0.0 (TID 2)
2019-11-24 16:05:39 INFO  Executor:54 - Running task 1.0 in stage 0.0 (TID 1)
2019-11-24 16:05:39 INFO  Executor:54 - Fetching spark://192.168.20.34:49223/jars/sparkDemo.jar with timestamp 1574591738675
2019-11-24 16:05:39 INFO  TransportClientFactory:267 - Successfully created connection to /192.168.20.34:49223 after 34 ms (0 ms spent in bootstraps)
2019-11-24 16:05:39 INFO  Utils:54 - Fetching spark://192.168.20.34:49223/jars/sparkDemo.jar to /private/var/folders/yh/xllvl9pd01g_skzq6_wg_3cst3pfgf/T/spark-bf924cf5-40e0-47c7-a7aa-e08fc7f4a865/userFiles-3d0f3b95-2c81-4840-811c-4581c1c333e3/fetchFileTemp4604090937888069829.tmp
2019-11-24 16:05:40 INFO  Executor:54 - Adding file:/private/var/folders/yh/xllvl9pd01g_skzq6_wg_3cst3pfgf/T/spark-bf924cf5-40e0-47c7-a7aa-e08fc7f4a865/userFiles-3d0f3b95-2c81-4840-811c-4581c1c333e3/sparkDemo.jar to class loader
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 7.0 in stage 0.0 (TID 7). 1075 bytes result sent to driver
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 0.0 in stage 0.0 (TID 0). 989 bytes result sent to driver
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 1.0 in stage 0.0 (TID 1). 1075 bytes result sent to driver
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 2.0 in stage 0.0 (TID 2). 989 bytes result sent to driver
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 5.0 in stage 0.0 (TID 5). 1075 bytes result sent to driver
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 4.0 in stage 0.0 (TID 4). 989 bytes result sent to driver
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 3.0 in stage 0.0 (TID 3). 1075 bytes result sent to driver
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 6.0 in stage 0.0 (TID 6). 989 bytes result sent to driver
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 2.0 in stage 0.0 (TID 2) in 451 ms on localhost (executor driver) (1/8)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 7.0 in stage 0.0 (TID 7) in 451 ms on localhost (executor driver) (2/8)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 0.0 in stage 0.0 (TID 0) in 481 ms on localhost (executor driver) (3/8)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 1.0 in stage 0.0 (TID 1) in 456 ms on localhost (executor driver) (4/8)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 4.0 in stage 0.0 (TID 4) in 455 ms on localhost (executor driver) (5/8)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 3.0 in stage 0.0 (TID 3) in 456 ms on localhost (executor driver) (6/8)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 5.0 in stage 0.0 (TID 5) in 455 ms on localhost (executor driver) (7/8)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 6.0 in stage 0.0 (TID 6) in 454 ms on localhost (executor driver) (8/8)
2019-11-24 16:05:40 INFO  TaskSchedulerImpl:54 - Removed TaskSet 0.0, whose tasks have all completed, from pool
2019-11-24 16:05:40 INFO  DAGScheduler:54 - ShuffleMapStage 0 (map at ErrorWarnCount.scala:23) finished in 0.693 s
2019-11-24 16:05:40 INFO  DAGScheduler:54 - looking for newly runnable stages
2019-11-24 16:05:40 INFO  DAGScheduler:54 - running: Set()
2019-11-24 16:05:40 INFO  DAGScheduler:54 - waiting: Set(ResultStage 1)
2019-11-24 16:05:40 INFO  DAGScheduler:54 - failed: Set()
2019-11-24 16:05:40 INFO  DAGScheduler:54 - Submitting ResultStage 1 (ShuffledRDD[2] at reduceByKey at ErrorWarnCount.scala:31), which has no missing parents
2019-11-24 16:05:40 INFO  MemoryStore:54 - Block broadcast_1 stored as values in memory (estimated size 3.2 KB, free 366.3 MB)
2019-11-24 16:05:40 INFO  MemoryStore:54 - Block broadcast_1_piece0 stored as bytes in memory (estimated size 2031.0 B, free 366.3 MB)
2019-11-24 16:05:40 INFO  BlockManagerInfo:54 - Added broadcast_1_piece0 in memory on 192.168.20.34:49224 (size: 2031.0 B, free: 366.3 MB)
2019-11-24 16:05:40 INFO  SparkContext:54 - Created broadcast 1 from broadcast at DAGScheduler.scala:1039
2019-11-24 16:05:40 INFO  DAGScheduler:54 - Submitting 8 missing tasks from ResultStage 1 (ShuffledRDD[2] at reduceByKey at ErrorWarnCount.scala:31) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7))
2019-11-24 16:05:40 INFO  TaskSchedulerImpl:54 - Adding task set 1.0 with 8 tasks
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Starting task 1.0 in stage 1.0 (TID 8, localhost, executor driver, partition 1, PROCESS_LOCAL, 7649 bytes)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Starting task 3.0 in stage 1.0 (TID 9, localhost, executor driver, partition 3, PROCESS_LOCAL, 7649 bytes)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Starting task 4.0 in stage 1.0 (TID 10, localhost, executor driver, partition 4, PROCESS_LOCAL, 7649 bytes)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Starting task 5.0 in stage 1.0 (TID 11, localhost, executor driver, partition 5, PROCESS_LOCAL, 7649 bytes)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Starting task 6.0 in stage 1.0 (TID 12, localhost, executor driver, partition 6, PROCESS_LOCAL, 7649 bytes)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Starting task 7.0 in stage 1.0 (TID 13, localhost, executor driver, partition 7, PROCESS_LOCAL, 7649 bytes)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Starting task 0.0 in stage 1.0 (TID 14, localhost, executor driver, partition 0, ANY, 7649 bytes)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Starting task 2.0 in stage 1.0 (TID 15, localhost, executor driver, partition 2, ANY, 7649 bytes)
2019-11-24 16:05:40 INFO  Executor:54 - Running task 3.0 in stage 1.0 (TID 9)
2019-11-24 16:05:40 INFO  Executor:54 - Running task 2.0 in stage 1.0 (TID 15)
2019-11-24 16:05:40 INFO  Executor:54 - Running task 5.0 in stage 1.0 (TID 11)
2019-11-24 16:05:40 INFO  Executor:54 - Running task 7.0 in stage 1.0 (TID 13)
2019-11-24 16:05:40 INFO  Executor:54 - Running task 0.0 in stage 1.0 (TID 14)
2019-11-24 16:05:40 INFO  Executor:54 - Running task 6.0 in stage 1.0 (TID 12)
2019-11-24 16:05:40 INFO  Executor:54 - Running task 4.0 in stage 1.0 (TID 10)
2019-11-24 16:05:40 INFO  Executor:54 - Running task 1.0 in stage 1.0 (TID 8)
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Getting 1 non-empty blocks out of 8 blocks
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Getting 0 non-empty blocks out of 8 blocks
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Getting 0 non-empty blocks out of 8 blocks
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Getting 3 non-empty blocks out of 8 blocks
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Getting 0 non-empty blocks out of 8 blocks
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Getting 0 non-empty blocks out of 8 blocks
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Getting 0 non-empty blocks out of 8 blocks
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Getting 0 non-empty blocks out of 8 blocks
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Started 0 remote fetches in 9 ms
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Started 0 remote fetches in 9 ms
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Started 0 remote fetches in 8 ms
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Started 0 remote fetches in 9 ms
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Started 0 remote fetches in 8 ms
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Started 0 remote fetches in 9 ms
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Started 0 remote fetches in 9 ms
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Started 0 remote fetches in 9 ms
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 3.0 in stage 1.0 (TID 9). 1134 bytes result sent to driver
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 7.0 in stage 1.0 (TID 13). 1134 bytes result sent to driver
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 4.0 in stage 1.0 (TID 10). 1134 bytes result sent to driver
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 5.0 in stage 1.0 (TID 11). 1134 bytes result sent to driver
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 6.0 in stage 1.0 (TID 12). 1134 bytes result sent to driver
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 1.0 in stage 1.0 (TID 8). 1134 bytes result sent to driver
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 7.0 in stage 1.0 (TID 13) in 48 ms on localhost (executor driver) (1/8)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 3.0 in stage 1.0 (TID 9) in 50 ms on localhost (executor driver) (2/8)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 4.0 in stage 1.0 (TID 10) in 50 ms on localhost (executor driver) (3/8)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 5.0 in stage 1.0 (TID 11) in 50 ms on localhost (executor driver) (4/8)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 6.0 in stage 1.0 (TID 12) in 50 ms on localhost (executor driver) (5/8)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 1.0 in stage 1.0 (TID 8) in 53 ms on localhost (executor driver) (6/8)
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 2.0 in stage 1.0 (TID 15). 1284 bytes result sent to driver
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 2.0 in stage 1.0 (TID 15) in 54 ms on localhost (executor driver) (7/8)
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 0.0 in stage 1.0 (TID 14). 1285 bytes result sent to driver
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 0.0 in stage 1.0 (TID 14) in 56 ms on localhost (executor driver) (8/8)
2019-11-24 16:05:40 INFO  TaskSchedulerImpl:54 - Removed TaskSet 1.0, whose tasks have all completed, from pool
2019-11-24 16:05:40 INFO  DAGScheduler:54 - ResultStage 1 (collect at ErrorWarnCount.scala:32) finished in 0.092 s
2019-11-24 16:05:40 INFO  DAGScheduler:54 - Job 0 finished: collect at ErrorWarnCount.scala:32, took 1.088511 s
(ERROR ,3)
(WARN ,1)
single line conversion
2019-11-24 16:05:40 INFO  SparkContext:54 - Starting job: collect at ErrorWarnCount.scala:39
2019-11-24 16:05:40 INFO  DAGScheduler:54 - Registering RDD 4 (map at ErrorWarnCount.scala:37)
2019-11-24 16:05:40 INFO  DAGScheduler:54 - Got job 1 (collect at ErrorWarnCount.scala:39) with 8 output partitions
2019-11-24 16:05:40 INFO  DAGScheduler:54 - Final stage: ResultStage 3 (collect at ErrorWarnCount.scala:39)
2019-11-24 16:05:40 INFO  DAGScheduler:54 - Parents of final stage: List(ShuffleMapStage 2)
2019-11-24 16:05:40 INFO  DAGScheduler:54 - Missing parents: List(ShuffleMapStage 2)
2019-11-24 16:05:40 INFO  DAGScheduler:54 - Submitting ShuffleMapStage 2 (MapPartitionsRDD[4] at map at ErrorWarnCount.scala:37), which has no missing parents
2019-11-24 16:05:40 INFO  MemoryStore:54 - Block broadcast_2 stored as values in memory (estimated size 3.1 KB, free 366.3 MB)
2019-11-24 16:05:40 INFO  MemoryStore:54 - Block broadcast_2_piece0 stored as bytes in memory (estimated size 2010.0 B, free 366.3 MB)
2019-11-24 16:05:40 INFO  BlockManagerInfo:54 - Added broadcast_2_piece0 in memory on 192.168.20.34:49224 (size: 2010.0 B, free: 366.3 MB)
2019-11-24 16:05:40 INFO  SparkContext:54 - Created broadcast 2 from broadcast at DAGScheduler.scala:1039
2019-11-24 16:05:40 INFO  DAGScheduler:54 - Submitting 8 missing tasks from ShuffleMapStage 2 (MapPartitionsRDD[4] at map at ErrorWarnCount.scala:37) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7))
2019-11-24 16:05:40 INFO  TaskSchedulerImpl:54 - Adding task set 2.0 with 8 tasks
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Starting task 0.0 in stage 2.0 (TID 16, localhost, executor driver, partition 0, PROCESS_LOCAL, 7844 bytes)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Starting task 1.0 in stage 2.0 (TID 17, localhost, executor driver, partition 1, PROCESS_LOCAL, 7878 bytes)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Starting task 2.0 in stage 2.0 (TID 18, localhost, executor driver, partition 2, PROCESS_LOCAL, 7844 bytes)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Starting task 3.0 in stage 2.0 (TID 19, localhost, executor driver, partition 3, PROCESS_LOCAL, 7879 bytes)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Starting task 4.0 in stage 2.0 (TID 20, localhost, executor driver, partition 4, PROCESS_LOCAL, 7844 bytes)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Starting task 5.0 in stage 2.0 (TID 21, localhost, executor driver, partition 5, PROCESS_LOCAL, 7879 bytes)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Starting task 6.0 in stage 2.0 (TID 22, localhost, executor driver, partition 6, PROCESS_LOCAL, 7844 bytes)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Starting task 7.0 in stage 2.0 (TID 23, localhost, executor driver, partition 7, PROCESS_LOCAL, 7879 bytes)
2019-11-24 16:05:40 INFO  Executor:54 - Running task 1.0 in stage 2.0 (TID 17)
2019-11-24 16:05:40 INFO  Executor:54 - Running task 5.0 in stage 2.0 (TID 21)
2019-11-24 16:05:40 INFO  Executor:54 - Running task 4.0 in stage 2.0 (TID 20)
2019-11-24 16:05:40 INFO  Executor:54 - Running task 2.0 in stage 2.0 (TID 18)
2019-11-24 16:05:40 INFO  Executor:54 - Running task 3.0 in stage 2.0 (TID 19)
2019-11-24 16:05:40 INFO  Executor:54 - Running task 0.0 in stage 2.0 (TID 16)
2019-11-24 16:05:40 INFO  Executor:54 - Running task 6.0 in stage 2.0 (TID 22)
2019-11-24 16:05:40 INFO  Executor:54 - Running task 7.0 in stage 2.0 (TID 23)
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 2.0 in stage 2.0 (TID 18). 946 bytes result sent to driver
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 6.0 in stage 2.0 (TID 22). 946 bytes result sent to driver
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 2.0 in stage 2.0 (TID 18) in 15 ms on localhost (executor driver) (1/8)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 6.0 in stage 2.0 (TID 22) in 14 ms on localhost (executor driver) (2/8)
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 7.0 in stage 2.0 (TID 23). 1032 bytes result sent to driver
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 4.0 in stage 2.0 (TID 20). 946 bytes result sent to driver
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 7.0 in stage 2.0 (TID 23) in 15 ms on localhost (executor driver) (3/8)
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 5.0 in stage 2.0 (TID 21). 1032 bytes result sent to driver
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 4.0 in stage 2.0 (TID 20) in 17 ms on localhost (executor driver) (4/8)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 5.0 in stage 2.0 (TID 21) in 18 ms on localhost (executor driver) (5/8)
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 0.0 in stage 2.0 (TID 16). 946 bytes result sent to driver
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 3.0 in stage 2.0 (TID 19). 1032 bytes result sent to driver
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 1.0 in stage 2.0 (TID 17). 1032 bytes result sent to driver
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 0.0 in stage 2.0 (TID 16) in 22 ms on localhost (executor driver) (6/8)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 3.0 in stage 2.0 (TID 19) in 20 ms on localhost (executor driver) (7/8)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 1.0 in stage 2.0 (TID 17) in 22 ms on localhost (executor driver) (8/8)
2019-11-24 16:05:40 INFO  TaskSchedulerImpl:54 - Removed TaskSet 2.0, whose tasks have all completed, from pool
2019-11-24 16:05:40 INFO  DAGScheduler:54 - ShuffleMapStage 2 (map at ErrorWarnCount.scala:37) finished in 0.030 s
2019-11-24 16:05:40 INFO  DAGScheduler:54 - looking for newly runnable stages
2019-11-24 16:05:40 INFO  DAGScheduler:54 - running: Set()
2019-11-24 16:05:40 INFO  DAGScheduler:54 - waiting: Set(ResultStage 3)
2019-11-24 16:05:40 INFO  DAGScheduler:54 - failed: Set()
2019-11-24 16:05:40 INFO  DAGScheduler:54 - Submitting ResultStage 3 (ShuffledRDD[5] at reduceByKey at ErrorWarnCount.scala:39), which has no missing parents
2019-11-24 16:05:40 INFO  MemoryStore:54 - Block broadcast_3 stored as values in memory (estimated size 3.3 KB, free 366.3 MB)
2019-11-24 16:05:40 INFO  MemoryStore:54 - Block broadcast_3_piece0 stored as bytes in memory (estimated size 2027.0 B, free 366.3 MB)
2019-11-24 16:05:40 INFO  BlockManagerInfo:54 - Added broadcast_3_piece0 in memory on 192.168.20.34:49224 (size: 2027.0 B, free: 366.3 MB)
2019-11-24 16:05:40 INFO  SparkContext:54 - Created broadcast 3 from broadcast at DAGScheduler.scala:1039
2019-11-24 16:05:40 INFO  DAGScheduler:54 - Submitting 8 missing tasks from ResultStage 3 (ShuffledRDD[5] at reduceByKey at ErrorWarnCount.scala:39) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7))
2019-11-24 16:05:40 INFO  TaskSchedulerImpl:54 - Adding task set 3.0 with 8 tasks
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Starting task 1.0 in stage 3.0 (TID 24, localhost, executor driver, partition 1, PROCESS_LOCAL, 7649 bytes)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Starting task 3.0 in stage 3.0 (TID 25, localhost, executor driver, partition 3, PROCESS_LOCAL, 7649 bytes)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Starting task 4.0 in stage 3.0 (TID 26, localhost, executor driver, partition 4, PROCESS_LOCAL, 7649 bytes)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Starting task 5.0 in stage 3.0 (TID 27, localhost, executor driver, partition 5, PROCESS_LOCAL, 7649 bytes)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Starting task 6.0 in stage 3.0 (TID 28, localhost, executor driver, partition 6, PROCESS_LOCAL, 7649 bytes)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Starting task 7.0 in stage 3.0 (TID 29, localhost, executor driver, partition 7, PROCESS_LOCAL, 7649 bytes)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Starting task 0.0 in stage 3.0 (TID 30, localhost, executor driver, partition 0, ANY, 7649 bytes)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Starting task 2.0 in stage 3.0 (TID 31, localhost, executor driver, partition 2, ANY, 7649 bytes)
2019-11-24 16:05:40 INFO  Executor:54 - Running task 3.0 in stage 3.0 (TID 25)
2019-11-24 16:05:40 INFO  Executor:54 - Running task 5.0 in stage 3.0 (TID 27)
2019-11-24 16:05:40 INFO  Executor:54 - Running task 4.0 in stage 3.0 (TID 26)
2019-11-24 16:05:40 INFO  Executor:54 - Running task 1.0 in stage 3.0 (TID 24)
2019-11-24 16:05:40 INFO  Executor:54 - Running task 0.0 in stage 3.0 (TID 30)
2019-11-24 16:05:40 INFO  Executor:54 - Running task 2.0 in stage 3.0 (TID 31)
2019-11-24 16:05:40 INFO  Executor:54 - Running task 7.0 in stage 3.0 (TID 29)
2019-11-24 16:05:40 INFO  Executor:54 - Running task 6.0 in stage 3.0 (TID 28)
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Getting 0 non-empty blocks out of 8 blocks
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Getting 0 non-empty blocks out of 8 blocks
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Started 0 remote fetches in 1 ms
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Started 0 remote fetches in 1 ms
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Getting 3 non-empty blocks out of 8 blocks
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Started 0 remote fetches in 1 ms
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Getting 1 non-empty blocks out of 8 blocks
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Started 0 remote fetches in 2 ms
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Getting 0 non-empty blocks out of 8 blocks
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Getting 0 non-empty blocks out of 8 blocks
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Started 0 remote fetches in 1 ms
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Getting 0 non-empty blocks out of 8 blocks
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Started 0 remote fetches in 1 ms
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Getting 0 non-empty blocks out of 8 blocks
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Started 0 remote fetches in 2 ms
2019-11-24 16:05:40 INFO  ShuffleBlockFetcherIterator:54 - Started 0 remote fetches in 2 ms
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 3.0 in stage 3.0 (TID 25). 1134 bytes result sent to driver
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 6.0 in stage 3.0 (TID 28). 1134 bytes result sent to driver
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 5.0 in stage 3.0 (TID 27). 1134 bytes result sent to driver
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 3.0 in stage 3.0 (TID 25) in 17 ms on localhost (executor driver) (1/8)
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 2.0 in stage 3.0 (TID 31). 1327 bytes result sent to driver
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 7.0 in stage 3.0 (TID 29). 1134 bytes result sent to driver
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 6.0 in stage 3.0 (TID 28) in 17 ms on localhost (executor driver) (2/8)
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 0.0 in stage 3.0 (TID 30). 1285 bytes result sent to driver
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 5.0 in stage 3.0 (TID 27) in 18 ms on localhost (executor driver) (3/8)
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 4.0 in stage 3.0 (TID 26). 1134 bytes result sent to driver
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 2.0 in stage 3.0 (TID 31) in 16 ms on localhost (executor driver) (4/8)
2019-11-24 16:05:40 INFO  Executor:54 - Finished task 1.0 in stage 3.0 (TID 24). 1134 bytes result sent to driver
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 7.0 in stage 3.0 (TID 29) in 18 ms on localhost (executor driver) (5/8)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 0.0 in stage 3.0 (TID 30) in 18 ms on localhost (executor driver) (6/8)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 4.0 in stage 3.0 (TID 26) in 20 ms on localhost (executor driver) (7/8)
2019-11-24 16:05:40 INFO  TaskSetManager:54 - Finished task 1.0 in stage 3.0 (TID 24) in 21 ms on localhost (executor driver) (8/8)
2019-11-24 16:05:40 INFO  TaskSchedulerImpl:54 - Removed TaskSet 3.0, whose tasks have all completed, from pool
2019-11-24 16:05:40 INFO  DAGScheduler:54 - ResultStage 3 (collect at ErrorWarnCount.scala:39) finished in 0.037 s
2019-11-24 16:05:40 INFO  DAGScheduler:54 - Job 1 finished: collect at ErrorWarnCount.scala:39, took 0.073152 s
(ERROR ,3)
(WARN ,1)
2019-11-24 16:05:40 INFO  SparkContext:54 - Invoking stop() from shutdown hook
2019-11-24 16:05:40 INFO  AbstractConnector:318 - Stopped Spark@263f04ca{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
2019-11-24 16:05:40 INFO  SparkUI:54 - Stopped Spark web UI at http://192.168.20.34:4040
2019-11-24 16:05:40 INFO  MapOutputTrackerMasterEndpoint:54 - MapOutputTrackerMasterEndpoint stopped!
2019-11-24 16:05:40 INFO  MemoryStore:54 - MemoryStore cleared
2019-11-24 16:05:40 INFO  BlockManager:54 - BlockManager stopped
2019-11-24 16:05:40 INFO  BlockManagerMaster:54 - BlockManagerMaster stopped
2019-11-24 16:05:40 INFO  OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:54 - OutputCommitCoordinator stopped!
2019-11-24 16:05:40 INFO  SparkContext:54 - Successfully stopped SparkContext
2019-11-24 16:05:40 INFO  ShutdownHookManager:54 - Shutdown hook called
2019-11-24 16:05:40 INFO  ShutdownHookManager:54 - Deleting directory /private/var/folders/yh/xllvl9pd01g_skzq6_wg_3cst3pfgf/T/spark-bf924cf5-40e0-47c7-a7aa-e08fc7f4a865
2019-11-24 16:05:40 INFO  ShutdownHookManager:54 - Deleting directory /private/var/folders/yh/xllvl9pd01g_skzq6_wg_3cst3pfgf/T/spark-4b1486ba-9b64-47d2-931d-941204c6f84e