import java.util.UUID import com.typesafe.config.Config import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.{SaveMode, _} import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.functions.litimport org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges} import org.apache.spark.sql.functions._ import scala.collection.mutable import scala.io.Source import scala.tools.fusesource_embedded.jansi.AnsiConsole /** * creates the dataset by passing RDD in various steps * */object DataHandlerService extends DataHandlerTrait { def loadResource(filename: String) = { val path = getClass.getResource(filename) // Source.fromInputStream(getClass.getResourceAsStream("/war-and-peace.txt")).getLines().foreach(println) val source = Source.fromURL(path) try source.mkString finally source.close() } def processMessages(messages: InputDStream[ConsumerRecord[String, String]], sparkSession: SparkSession, errorDirPath: String, successPath: String, kafkaSink: Broadcast[KafkaSinkService], brValidators: Broadcast[mutable.Map[String, Array[Validator]]], brDefaultValues: Broadcast[mutable.Map[String, Any]], brTransFormerMap: Broadcast[mutable.Map[String, AttributeTransformer]] , brConfig: Broadcast[Config]): Unit = { var schema = SchemaService.getSchemaDefinition() val encoder = RowEncoder.apply(schema) val fileContents = loadResource("/test-schema") print(fileContents) val schema3 = DataType.fromJson(fileContents).asInstanceOf[StructType] print(schema3) messages.foreachRDD(rdd => { import sparkSession.implicits._ val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges if (!rdd.isEmpty() && rdd.count() > 0) { val dataSet : Dataset[String] = sparkSession.createDataset(rdd.map(x => x.value())) print(dataSet.show(2)) //perform the logic on the rdd var streamData = sparkSession.read.option("mode", "PERMISSIVE").option("columnNameOfCorruptRecord", "corrupt_record").schema(schema3).json(dataSet) // val streamData = sparkSession.read.schema(schema3).json(dataSet) // streamData.write.json("/Users/z002qhl/Documents/Spark/SparkProjects/sparkStreaming/table") // streamData.write.mode("append").option("compression","snappy").format("orc").saveAsTable("z002qhl.testtable") print("schema::" + streamData.printSchema()) //streamData.foreach( mes => println("msg:" + mes.mkString)) val generateUUID = udf(() => UUID.randomUUID().toString) // val ts = to_timestamp($"dts", "MM/dd/yyyy HH:mm:ss") // streamData.withColumn("ts", ts) streamData = streamData.withColumn("decorated",lit(1) ).withColumn("current_date", current_date()) .withColumn("unix_timestamp", unix_timestamp()) .withColumn("Next_processing_date", date_add(current_date(), 10)) .withColumn("uuid",generateUUID()) .withColumn("concatedrows", concat('action,'atp_tracking_id)) .withColumn("casting", $"action".substr(0,3)) print(streamData.show(5)) } messages.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) })
Tuesday, October 29, 2019
Read message and column transformations
Kafka message validation
val fileContents = loadResource("/input-schema") print(fileContents) val schema3 = DataType.fromJson(fileContents).asInstanceOf[StructType] print(schema3) messages.foreachRDD(rdd => { import sparkSession.implicits._ val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges if (!rdd.isEmpty() && rdd.count() > 0) { val dataSet : Dataset[String] = sparkSession.createDataset(rdd.map(x => x.value())) print(dataSet.show(2)) //perform the logic on the rdd val streamData = sparkSession.read.option("mode", "PERMISSIVE").option("columnNameOfCorruptRecord", "corrupt_record").schema(schema3).json(dataSet) // val streamData = sparkSession.read.schema(schema3).json(dataSet) print(streamData.show(5)) // streamData.write.json("/Users/z002qhl/Documents/Spark/SparkProjects/sparkStreaming/table") // streamData.write.mode("append").option("compression","snappy").format("orc").saveAsTable("z002qhl.testtable") print("schema::" + streamData.printSchema()) streamData.map(mes => mes.) } messages.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) })
def loadResource(filename: String) = { val path = getClass.getResource(filename) // Source.fromInputStream(getClass.getResourceAsStream("/war-and-peace.txt")).getLines().foreach(println) val source = Source.fromURL(path) try source.mkString finally source.close() }{ "type": "struct", "fields": [ { "name": "action", "type": "string", "nullable": true, "metadata": { } }, { "name": "tracking_id", "type": "string", "nullable": true, "metadata": { } }, { "name": "corrupt_record", "type": "string", "nullable": true, "metadata": { } } ] }
Sunday, October 27, 2019
Schema validation of all the fields in json
You can try the following code to read the JSON file
based on Schema in Spark 2.2
import org.apache.spark.sql.types.{DataType, StructType}
//Read Json Schema and Create
Schema_Json
val schema_json=spark.read.json("/user/Files/ActualJson.json").schema.json
val schema_json=spark.read.json("/user/Files/ActualJson.json").schema.json
//add the schema
val newSchema=DataType.fromJson(schema_json).asInstanceOf[StructType]
val newSchema=DataType.fromJson(schema_json).asInstanceOf[StructType]
//read the json files based on schema
val df=spark.read.schema(newSchema).json("Json_Files/Folder Path")
val df=spark.read.schema(newSchema).json("Json_Files/Folder Path")
Friday, October 25, 2019
DAG Stage Jobs Tasks relationship
In this article, I will try to explain how Spark works internally and what the components of execution are: jobs, tasks, and stages.
Job Tasks stages
https://dzone.com/articles/how-spark-internally-executes-a-program
So once you perform any action on an RDD, Spark context gives your program
to the driver.
The driver creates the DAG (directed acyclic graph) or execution plan (job)
for your program. Once the DAG is created, the driver divides this DAG into
a number of stages. These stages are then divided into smaller tasks
and all the tasks are given to the executors for execution.
But why did Spark divided this program into two stages? Why not more than two or less than two? Basically, it depends on shuffling, i.e. whenever you perform any transformation where Spark needs to shuffle the data by communicating to the other partitions, it creates other stages for such transformations. And the transformation does not require the shuffling of your data; it creates a single stage for it.
But why did Spark divide only two tasks for each stage? It depends on your number of partitions.
DAG(JOB) -- > Splits into number of stages --> Tasks are given to executors -- > Executors will be executed in a machine.
One machine can have multiple executors with multiple cores.
Job Tasks stages
https://dzone.com/articles/how-spark-internally-executes-a-program
So once you perform any action on an RDD, Spark context gives your program
to the driver.
The driver creates the DAG (directed acyclic graph) or execution plan (job)
for your program. Once the DAG is created, the driver divides this DAG into
a number of stages. These stages are then divided into smaller tasks
and all the tasks are given to the executors for execution.
But why did Spark divided this program into two stages? Why not more than two or less than two? Basically, it depends on shuffling, i.e. whenever you perform any transformation where Spark needs to shuffle the data by communicating to the other partitions, it creates other stages for such transformations. And the transformation does not require the shuffling of your data; it creates a single stage for it.
But why did Spark divide only two tasks for each stage? It depends on your number of partitions.
DAG(JOB) -- > Splits into number of stages --> Tasks are given to executors -- > Executors will be executed in a machine.
One machine can have multiple executors with multiple cores.
Machine allocation executor and core
6 machines
16 cores per machine - 96 cores total
64gb ram per machine
smallest possible executor
1 core per executor - so that 96 executors we can have
largest possible
6 executors with each executor having 16 cores
No resources for other process
best approach
96 cores : 1 core for each machine we have to leave for os
we have 90 cores, so we are left with 15 cores.
15 cores each machine ... we can have 3 executors with each 5 cores
63/3 = 21GB , leave 2 gb for yarn overhead 19gb per executor
SO each machine will have 3 executors , and each executor having 5 cores each
16 cores per machine - 96 cores total
64gb ram per machine
smallest possible executor
1 core per executor - so that 96 executors we can have
largest possible
6 executors with each executor having 16 cores
No resources for other process
best approach
96 cores : 1 core for each machine we have to leave for os
we have 90 cores, so we are left with 15 cores.
15 cores each machine ... we can have 3 executors with each 5 cores
63/3 = 21GB , leave 2 gb for yarn overhead 19gb per executor
SO each machine will have 3 executors , and each executor having 5 cores each
Sunday, October 20, 2019
Sqoop basics
Day2- sqoop
Only mappers will run. There will not be any reducers.
HDFS to RDBMS can be done - sqoop export
RDBMS to HDFS can be done which is called sqoop import
By default 4 mappers will be assigned for the sqoop import execution
To look table from hadoop environment
To list databases
sqoop-list-databases \
--connect "jdbc:mysql://quickstart.cloudera:3306" \
--username retail_dba \
--password cloudera
To check list of tables in a particular db
sqoop-list-tables \
--connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
--username retail_dba \
--password cloudera
To check records in the table
sqoop-eval \
--connect "jdbc:mysql://quickstart.cloudera:3306" \
--username retail_dba \
--password cloudera \
--query "select * from retail_db.customers limit 10"
create a table in sql and insert some data
CREATE TABLE people (PersonID int,LastName varchar(255),FirstName varchar(255),Address varchar(255),City varchar(255));
to just sqoop one table run below command
========================================
sqoop import \
--connect jdbc:mysql://quickstart.cloudera:3306/retail_db \
--username root \
--password cloudera \
--table orders \
--m 1 \
--target-dir /queryresult
m implies number of mappers
target-dir implies where to copy the content of it.
target-dir should not be existing, so that system will create
Number of mappers if you increase its going to hit DB performance. Bound by number of connections.
For importing all the tables from the database
sqoop-import-all-tables \
--connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
--username retail_dba \
--password cloudera \
--as-sequencefile \
-m 4 \
--warehouse-dir /user/cloudera/sqoopdir
--as-sequencefile supports fileformat.
It will bring table by table.
Directory will have subfolders /user/cloudera/sqoopdir same as tablename.
SO extensively warehouse-dir will be used
Only mappers will run. There will not be any reducers.
HDFS to RDBMS can be done - sqoop export
RDBMS to HDFS can be done which is called sqoop import
By default 4 mappers will be assigned for the sqoop import execution
To look table from hadoop environment
To list databases
sqoop-list-databases \
--connect "jdbc:mysql://quickstart.cloudera:3306" \
--username retail_dba \
--password cloudera
To check list of tables in a particular db
sqoop-list-tables \
--connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
--username retail_dba \
--password cloudera
To check records in the table
sqoop-eval \
--connect "jdbc:mysql://quickstart.cloudera:3306" \
--username retail_dba \
--password cloudera \
--query "select * from retail_db.customers limit 10"
create a table in sql and insert some data
CREATE TABLE people (PersonID int,LastName varchar(255),FirstName varchar(255),Address varchar(255),City varchar(255));
to just sqoop one table run below command
========================================
sqoop import \
--connect jdbc:mysql://quickstart.cloudera:3306/retail_db \
--username root \
--password cloudera \
--table orders \
--m 1 \
--target-dir /queryresult
m implies number of mappers
target-dir implies where to copy the content of it.
target-dir should not be existing, so that system will create
Number of mappers if you increase its going to hit DB performance. Bound by number of connections.
For importing all the tables from the database
sqoop-import-all-tables \
--connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
--username retail_dba \
--password cloudera \
--as-sequencefile \
-m 4 \
--warehouse-dir /user/cloudera/sqoopdir
--as-sequencefile supports fileformat.
It will bring table by table.
Directory will have subfolders /user/cloudera/sqoopdir same as tablename.
SO extensively warehouse-dir will be used
UDF in HDFS
package udf_example;
import org.apache.hadoop.hive.ql.exec.UDF;
public class DataStandardization extends UDF{
public String evaluate(String input){
if(input == null){
return null;
}
return (input.toUpperCase());
}
}
rankfunctions.txt
Frank,1150
Frank,1700
CREATE TABLE IF NOT EXISTS rankchcektable(
col1 string,
col2 int
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' line terminated by '\n' STORED AS TEXTFILE
load data local inpath '/Users/basan/rankfunctions.txt' into table rankchcektable
Create jar
//Add the jar to hive
hive>add jar /users/basan/my_udf.jar
It will add jar to classpath
//create the function in hive
create temporary function f1 as 'udf_example.DataStandardization';
create temporary function functionname as 'packagename.classname';
//applying udf function
select f1(col1) from rankchcektable
import org.apache.hadoop.hive.ql.exec.UDF;
public class DataStandardization extends UDF{
public String evaluate(String input){
if(input == null){
return null;
}
return (input.toUpperCase());
}
}
rankfunctions.txt
Frank,1150
Frank,1700
CREATE TABLE IF NOT EXISTS rankchcektable(
col1 string,
col2 int
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' line terminated by '\n' STORED AS TEXTFILE
load data local inpath '/Users/basan/rankfunctions.txt' into table rankchcektable
Create jar
//Add the jar to hive
hive>add jar /users/basan/my_udf.jar
It will add jar to classpath
//create the function in hive
create temporary function f1 as 'udf_example.DataStandardization';
create temporary function functionname as 'packagename.classname';
//applying udf function
select f1(col1) from rankchcektable
sort by col by queries
Day2 :
Load the order.txt
create the table
order by - full sorting of data 1 reducer
select count from table2 order by count
https://drive.google.com/file/d/1abNo-jsy_l_Xo0krBoHpW3tkpSukr3Lp/view
order by will have do global sorting and it will use only one reducer.
We will run order by , sort by , cluster by and distribute by which are related to sorting
aa,1
bb,1
dd,5
ef,2
teh,1
CREATE TABLE IF NOT EXISTS table2(
name string,
count int
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE
load data local inpath '/Users/basan/order.txt' into table table2
select count from table2 order by count
order by will take only 1 reducer , independent of how many reducers we throw
____ ___ ___ ____
//set explicitly 2 reducers
SET mapreduce.job.reduces=2
select count from table2 sort by count
it will produce 2 different data sets
sort by considers number of reducers and sorting happen in the respective reducer.
It will have
______ ___ ___
group by will also use single reducer
_______
distribute by - will send data to specific reducer
SET mapreduce.job.reduces=2
select count from table2 distribute by count sort by count
It will make sure there is no overlap of data
Ideally we should use distribute by and then sort by to be used
By doing this we can force the 2 number of reducers
____
cluster by is same as = distribute by count sort by count
select count from table2 cluster by count;
Rankings
create table using rankfunctions.txt
John,1500
Albert,1500
Mark,1000
Frank,1150
Frank,1700
CREATE TABLE IF NOT EXISTS rankchcektable(
col1 string,
col2 int
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' line terminated by '\n' STORED AS TEXTFILE
load data local inpath '/Users/basan/rankfunctions.txt' into table rankchcektable
//overwrite the old data
load data local inpath '/Users/basan/rankfunctions.txt' overwrite into table rankchcektable
select col1, col2, rank() over (order by col2 desc) as ranking from table2
gives tanking, ties are assigned the same rank with the next ranking skipped
Mark,1000 1
Frank,1150 2
John,1500 3
Albert,1500 3
Frank,1700 5
Difference between rank, dense rank and rownum
select col1, col2, dense_rank() over (order by col2 desc) as ranking from table2
Ties are assigned same rank,Ranks are not skipped.
Mark,1000 1
Frank,1150 2
John,1500 3
Albert,1500 3
Frank,1700 4
____
row number : To find top n rankings it will be used
select col1, col2, row_number() over (order by col2 desc) as ranking from table2
Mark,1000 1
Frank,1150 2
John,1500 3
Albert,1500 4
Frank,1700 5
So rank assigned will be unique
____
select col1, col2, row_number() over (partition by col1 order by col2 desc) as ranking from table2
First it does grouping then assigns ranking
use case is top 2 spend per person.
Albert 1500 1
Bhut 800 1
lesa 1500 1
lesa 900 2
Load the order.txt
create the table
order by - full sorting of data 1 reducer
select count from table2 order by count
https://drive.google.com/file/d/1abNo-jsy_l_Xo0krBoHpW3tkpSukr3Lp/view
order by will have do global sorting and it will use only one reducer.
We will run order by , sort by , cluster by and distribute by which are related to sorting
aa,1
bb,1
dd,5
ef,2
teh,1
CREATE TABLE IF NOT EXISTS table2(
name string,
count int
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE
load data local inpath '/Users/basan/order.txt' into table table2
select count from table2 order by count
order by will take only 1 reducer , independent of how many reducers we throw
____ ___ ___ ____
//set explicitly 2 reducers
SET mapreduce.job.reduces=2
select count from table2 sort by count
it will produce 2 different data sets
sort by considers number of reducers and sorting happen in the respective reducer.
It will have
______ ___ ___
group by will also use single reducer
_______
distribute by - will send data to specific reducer
SET mapreduce.job.reduces=2
select count from table2 distribute by count sort by count
It will make sure there is no overlap of data
Ideally we should use distribute by and then sort by to be used
By doing this we can force the 2 number of reducers
____
cluster by is same as = distribute by count sort by count
select count from table2 cluster by count;
Rankings
create table using rankfunctions.txt
John,1500
Albert,1500
Mark,1000
Frank,1150
Frank,1700
CREATE TABLE IF NOT EXISTS rankchcektable(
col1 string,
col2 int
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' line terminated by '\n' STORED AS TEXTFILE
load data local inpath '/Users/basan/rankfunctions.txt' into table rankchcektable
//overwrite the old data
load data local inpath '/Users/basan/rankfunctions.txt' overwrite into table rankchcektable
select col1, col2, rank() over (order by col2 desc) as ranking from table2
gives tanking, ties are assigned the same rank with the next ranking skipped
Mark,1000 1
Frank,1150 2
John,1500 3
Albert,1500 3
Frank,1700 5
Difference between rank, dense rank and rownum
select col1, col2, dense_rank() over (order by col2 desc) as ranking from table2
Ties are assigned same rank,Ranks are not skipped.
Mark,1000 1
Frank,1150 2
John,1500 3
Albert,1500 3
Frank,1700 4
____
row number : To find top n rankings it will be used
select col1, col2, row_number() over (order by col2 desc) as ranking from table2
Mark,1000 1
Frank,1150 2
John,1500 3
Albert,1500 4
Frank,1700 5
So rank assigned will be unique
____
select col1, col2, row_number() over (partition by col1 order by col2 desc) as ranking from table2
First it does grouping then assigns ranking
use case is top 2 spend per person.
Albert 1500 1
Bhut 800 1
lesa 1500 1
lesa 900 2
Saturday, October 19, 2019
BigData concepts in a nutshell
In the version of HDP1.0 we had only HDFS and MR
HDP 2.0 we started having HDFS , MR and YARN
With Spark YARN, MESOS or Kunernetes can be used as negotiator
HDFS : Addresses distributed storage issues
Pig : It is scripting language which can be used for doing ETL kind of jobs. Int he data pipeline we have to do cleaning using PIG scripts and then load into Hive table.
Hbase : It is Nosql database which provides ACID behavour. With the Hive we will not be able to update or delete the record . With Hbase we will be able to update or delete the record.
There is a way to make Hive table accessible in Hbase.
Oozie : It will be used for scheduling jobs.
Usually scenarios involving processing of table goes to hive, search for a key related use cases will fall in Hbase
Pig uses only Mapper phase , all the other components will be using Mapper and Reducer
The Balanced approach of 2-racks and 3-copies in Rack awareness mechanism is adopted to
Minimize Write-bandwidth and Maximize Redundancy.
Name node federation concept is meant for -
Load sharing.
In which of the following scenarios would the introduction of combiner can lead to wrong results -
Calculating the average.
Consider the following table structure:students (name STRING,id INT,subjects ARRAY,feeDetails MAP,phoneNumber STRUCT ) . To list the subjects taken by each student, we can use the following query, which executes successfully: select name, explode(subjects) from students;
False
Which of the following work-flow is valid in MR
map->partition->shuffle->sort->reduce.
Name node federation : metadata can be divieded to other node. It is for load sharing.
Secondary node : for checkpointing and tollerance.
HDP 2.0 we started having HDFS , MR and YARN
With Spark YARN, MESOS or Kunernetes can be used as negotiator
HDFS : Addresses distributed storage issues
Pig : It is scripting language which can be used for doing ETL kind of jobs. Int he data pipeline we have to do cleaning using PIG scripts and then load into Hive table.
Hbase : It is Nosql database which provides ACID behavour. With the Hive we will not be able to update or delete the record . With Hbase we will be able to update or delete the record.
There is a way to make Hive table accessible in Hbase.
Oozie : It will be used for scheduling jobs.
Usually scenarios involving processing of table goes to hive, search for a key related use cases will fall in Hbase
Pig uses only Mapper phase , all the other components will be using Mapper and Reducer
The Balanced approach of 2-racks and 3-copies in Rack awareness mechanism is adopted to
Minimize Write-bandwidth and Maximize Redundancy.
Name node federation concept is meant for -
Load sharing.
In which of the following scenarios would the introduction of combiner can lead to wrong results -
Calculating the average.
Consider the following table structure:students (name STRING,id INT,subjects ARRAY
False
Which of the following work-flow is valid in MR
map->partition->shuffle->sort->reduce.
Name node federation : metadata can be divieded to other node. It is for load sharing.
Secondary node : for checkpointing and tollerance.
AWS Setup of HDFS
EC2 setup, free credits cannot be used for EMR cluster
AWS Management Console
Services - Search for EMR -
Create cluster
provide clustername - basancluster
select emr-5.27.0
which will have hadoop Ganglia hive
Can use m4.large machine - 0.10$/hr per instance cost
Google for ec2 pricing : can check the price of machines
number of instances 3
create key pair and configure the AWS to access cluster using PEM file
s3 is free in AWS
It will take some time to create the AWS cluster after the request.
Enable ssh to the master for accessing via ssh command
click on create security group
inbound : traffic coming inside the system, to access ssh create inbound rule
Edit inbound rules :
ssh 22 select : myip
Once this is done it will take some time for creating the cluster
Connect to the master node using below command
ssh -i filecreated.pem hadoop@ip.compute.amazonaws.com
AWS commands
hive
create database trendytech;
use trendytech;
show table
upload the file in s3 by creating bucket.(Search for S3 and create bucket under that folder and file
can br created)
In the services -> search for s3 buckets
loading data from s3 to table
load data inpath 's3://basantech-basan/dataset.csv' into table country_input
remove the word local
Under the bucket create the folder and the folder path can be configured
Amazon - AWS - EMR - Elastic map reduce
Free tier is not valid for emr in AWS
Google - GCP- GoogleDataProc
Microsoft - Azure - HD Insight
AWS is very famous among all the clusters
Need not use Cloudera Manager.
hadoop fs -ls /user/hive/warehouse can be seen in elastic search as well.
Search on Services - EMR
Terminate to kill the service to kill the service and avoding getting charged on your credit card.
AWS Management Console
Services - Search for EMR -
Create cluster
provide clustername - basancluster
select emr-5.27.0
which will have hadoop Ganglia hive
Can use m4.large machine - 0.10$/hr per instance cost
Google for ec2 pricing : can check the price of machines
number of instances 3
create key pair and configure the AWS to access cluster using PEM file
s3 is free in AWS
It will take some time to create the AWS cluster after the request.
Enable ssh to the master for accessing via ssh command
click on create security group
inbound : traffic coming inside the system, to access ssh create inbound rule
Edit inbound rules :
ssh 22 select : myip
Once this is done it will take some time for creating the cluster
Connect to the master node using below command
ssh -i filecreated.pem hadoop@ip.compute.amazonaws.com
AWS commands
hive
create database trendytech;
use trendytech;
show table
upload the file in s3 by creating bucket.(Search for S3 and create bucket under that folder and file
can br created)
In the services -> search for s3 buckets
loading data from s3 to table
load data inpath 's3://basantech-basan/dataset.csv' into table country_input
remove the word local
Under the bucket create the folder and the folder path can be configured
Amazon - AWS - EMR - Elastic map reduce
Free tier is not valid for emr in AWS
Google - GCP- GoogleDataProc
Microsoft - Azure - HD Insight
AWS is very famous among all the clusters
Need not use Cloudera Manager.
hadoop fs -ls /user/hive/warehouse can be seen in elastic search as well.
Search on Services - EMR
Terminate to kill the service to kill the service and avoding getting charged on your credit card.
HDFS Loading files and creating TEXTFORMAT and ORC files
Create the Text Format table
//Getting size of the underlaying folders
hadoop fs -du -h /user/hive/warehouse/trendytech.db
hadoop fs -ls /user/hive/warehouse/trendytech.db/country_bucket
For each partition there will be one folder created
hadoop fs -ls /user/hive/warehouse/trendytech.db/country_bucket/country=estonia
will list all the files under the partitioned folder
CREATE TABLE IF NOT EXISTS country_input(
longitide float,
lattitude float,
number int,
street string,
unit string,
city string,
district string,
region string,
postcode int,
id string,
hash string,
country string
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE
//loading the table from the file located in the path
load data local inpath '/Users/basan/datasetnew' into table country_input
select * from country_input where country='belgium' and street='Rue Ketels';
//Create table in ORC FORMAT
CREATE TABLE IF NOT EXISTS country_input_orc(
longitide float,
lattitude float,
number int,
street string,
unit string,
city string,
district string,
region string,
postcode int,
id string,
hash string,
country string
)ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS orc
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' is not needed in ORC file format.
For populating into ORC table from the text file
insert into table country_input_orc select * from country_input
/user/hive/warehouse - location for ORC tables
ORC tables will occupy less disk space and selection will be much faster compared to TEXTFORMAT file.
If we want to still make the queries faster then we have to use partitioning and bucketing. For each partition there will be a folder created, and
for each bucket there will be a file created under the partitioned folder. Once we define the size of bucket we cannot change, if needs to be changed then
we have to create fresh table and import data from old table
CREATE TABLE IF NOT EXISTS country_bucket(
longitide float,
lattitude float,
number int,
street string,
unit string,
city string,
district string,
region string,
postcode int,
id string,
hash string,
country string
)PARTITIONED BY (country string) CLUSTERED BY (street)
into 32 buckets
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE
We have created table with country partitioning and 32 buckets
//Getting size of the underlaying folders
hadoop fs -du -h /user/hive/warehouse/trendytech.db
hadoop fs -ls /user/hive/warehouse/trendytech.db/country_bucket
For each partition there will be one folder created
hadoop fs -ls /user/hive/warehouse/trendytech.db/country_bucket/country=estonia
will list all the files under the partitioned folder
//Getting size of the underlaying folders
hadoop fs -du -h /user/hive/warehouse/trendytech.db
hadoop fs -ls /user/hive/warehouse/trendytech.db/country_bucket
For each partition there will be one folder created
hadoop fs -ls /user/hive/warehouse/trendytech.db/country_bucket/country=estonia
will list all the files under the partitioned folder
Sunday, October 13, 2019
Date Time Operation in Scala and Spark
scala> import java.sql.Date
import java.sql.Date
scala> import org.apache.spark.sql.types.{DateType, IntegerType}
import org.apache.spark.sql.types.{DateType, IntegerType}
scala> Date.valueOf("2016-09-30")
res0: java.sql.Date = 2016-09-30
scala> import java.time.LocalDateTime
import java.time.LocalDateTime
scala>
scala> import java.sql.Timestamp
import java.sql.Timestamp
scala> val levels = Seq(
| // (year, month, dayOfMonth, hour, minute, second)
| ((2012, 12, 12, 12, 12, 12), 5),
| ((2012, 12, 12, 12, 12, 14), 9),
| ((2012, 12, 12, 13, 13, 14), 4),
| ((2016, 8, 13, 0, 0, 0), 10),
| ((2017, 5, 27, 0, 0, 0), 15)).
| map { case ((yy, mm, dd, h, m, s), a) => (LocalDateTime.of(yy, mm, dd, h, m, s), a) }.
| map { case (ts, a) => (Timestamp.valueOf(ts), a) }.
| toDF("time", "level")
levels: org.apache.spark.sql.DataFrame = [time: timestamp, level: int]
scala>
scala> levels.show
+-------------------+-----+
| time|level|
+-------------------+-----+
|2012-12-12 12:12:12| 5|
|2012-12-12 12:12:14| 9|
|2012-12-12 13:13:14| 4|
|2016-08-13 00:00:00| 10|
|2017-05-27 00:00:00| 15|
+-------------------+-----+
scala>
https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-functions-datetime.html
Saturday, October 12, 2019
Spark Unit Test First example using scalatest
package com.chapter16.SparkTesting import org.scalatest.{ BeforeAndAfterAll, FunSuite } import org.scalatest.Assertions._ import org.apache.spark.sql.SparkSession import org.apache.spark.rdd.RDD class wordCountTest2 extends FunSuite with BeforeAndAfterAll { var spark: SparkSession = null def tokenize(line: RDD[String]) = { line.map(x => x.split(' ')).collect() } override def beforeAll() { spark = SparkSession .builder .master("local[*]") .config("spark.sql.warehouse.dir", "E:/Exp/") .appName(s"OneVsRestExample") .getOrCreate() } test("Test if two RDDs are equal") { val input = List("To be,", "or not to be:", "that is the question-", "William Shakespeare") val expected = Array(Array("To", "be,"), Array("or", "not", "to", "be:"), Array("that", "is", "the", "question-"), Array("William", "Shakespeare")) val transformed = tokenize(spark.sparkContext.parallelize(input)) assert(transformed === expected) } test("Test for word count RDD") { val fileName = "C:/Users/rezkar/Downloads/words.txt" val obj = new wordCountRDD val result = obj.prepareWordCountRDD(fileName, spark) assert(result.count() === 214) } override def afterAll() { spark.stop() } }
Tuesday, October 8, 2019
Creating Dataframe of structtype
val dfLocal2 = Seq(
(1,"PRESCHOOL", true, "basan", "11", "asdas", "3", "LINES"),
(2, null, false, "basan", "32", "asfasf", "13", "/COSMETICS"),
(3,"FURNITURE", null, "basan", "23", "asfasf HOME", "4", "HOME")
).toDF("id_int_null","string_null","boolean_null","TCIN","DIVISION_ID","DIVISION_NAME","GROUP_ID","GROUP_NAME")
dfLocal2.show()
dfLocal2.printSchema()
___
import org.apache.spark.sql.Row
import org.apache.spark.sql._
import org.apache.spark.sql.types._
val someData = Seq(
Row(8, "bat"),
Row(64, "mouse"),
Row(-27, "horse"),
Row(null,"abc")
)
val someSchema = List(
StructField("number", IntegerType, true),
StructField("word", StringType, true)
)
val someDF = spark.createDataFrame(
sc.parallelize(someData),
StructType(someSchema)
)
someDF.show(6)
someDF.show()
(1,"PRESCHOOL", true, "basan", "11", "asdas", "3", "LINES"),
(2, null, false, "basan", "32", "asfasf", "13", "/COSMETICS"),
(3,"FURNITURE", null, "basan", "23", "asfasf HOME", "4", "HOME")
).toDF("id_int_null","string_null","boolean_null","TCIN","DIVISION_ID","DIVISION_NAME","GROUP_ID","GROUP_NAME")
dfLocal2.show()
dfLocal2.printSchema()
___
import org.apache.spark.sql.Row
import org.apache.spark.sql._
import org.apache.spark.sql.types._
val someData = Seq(
Row(8, "bat"),
Row(64, "mouse"),
Row(-27, "horse"),
Row(null,"abc")
)
val someSchema = List(
StructField("number", IntegerType, true),
StructField("word", StringType, true)
)
val someDF = spark.createDataFrame(
sc.parallelize(someData),
StructType(someSchema)
)
someDF.show(6)
someDF.show()
Try Success failure on RDD operation
import scala.util.{Failure, Success, Try}
import org.apache.spark.sql.Row
val rdd= spark.sparkContext.parallelize(Seq(("Java", 20000), ("Python", 100000), ("Scala", 3000)))
val goodBadRecords = rdd.map(line =>
Try{
print(line)
} match {
case Success(map) => Right(map)
case Failure(e) => Left(e)
}
)
val records = goodBadRecords.filter(_.isRight)
val errors = goodBadRecords.filter(_.isLeft)
records.count()
errors.count()
import org.apache.spark.sql.Row
val rdd= spark.sparkContext.parallelize(Seq(("Java", 20000), ("Python", 100000), ("Scala", 3000)))
val goodBadRecords = rdd.map(line =>
Try{
print(line)
} match {
case Success(map) => Right(map)
case Failure(e) => Left(e)
}
)
val records = goodBadRecords.filter(_.isRight)
val errors = goodBadRecords.filter(_.isLeft)
records.count()
errors.count()
Monday, October 7, 2019
create Dstream manually using REPL
import scala.collection.mutable
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
val lines = Seq("To be or not to be.", "That is the question.")
val rdd = sc.parallelize(lines)
val lines = mutable.Queue[RDD[String]]()
val streamingContext = new StreamingContext(sc, Seconds(10))
val dstream = streamingContext.queueStream(lines)
// append data to DStream
lines += sc.makeRDD(Seq("To be or not to be.", "That is the question."))
http://mkuthan.github.io/blog/2015/03/01/spark-unit-testing/
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
val lines = Seq("To be or not to be.", "That is the question.")
val rdd = sc.parallelize(lines)
val lines = mutable.Queue[RDD[String]]()
val streamingContext = new StreamingContext(sc, Seconds(10))
val dstream = streamingContext.queueStream(lines)
// append data to DStream
lines += sc.makeRDD(Seq("To be or not to be.", "That is the question."))
http://mkuthan.github.io/blog/2015/03/01/spark-unit-testing/
Add fixed value column in dataframe
scala> val dfLocal = Seq(
| ("DEPARTMENT_ID1","DEPARTMENT_NAME1","GROUP_NAME1"),
|
| ("1","PRESCHOOL", "asdasf"),
| ("2","COSMETICS", "COSMETICS"),
| ("3","FURNITURE", "HOME")
| ).toDF("DEPARTMENT_ID","DEPARTMENT_NAME","GROUP_NAME")
dfLocal: org.apache.spark.sql.DataFrame = [DEPARTMENT_ID: string, DEPARTMENT_NAME: string ... 1 more field]
scala> dfLocal.printSchema()
root
|-- DEPARTMENT_ID: string (nullable = true)
|-- DEPARTMENT_NAME: string (nullable = true)
|-- GROUP_NAME: string (nullable = true)
scala> import org.apache.spark.sql.functions.typedLit
import org.apache.spark.sql.functions.typedLit
scala> val successDataSet2 = dfLocal.withColumn("newcolumnadded", lit("204"))
successDataSet2: org.apache.spark.sql.DataFrame = [DEPARTMENT_ID: string, DEPARTMENT_NAME: string ... 2 more fields]
scala> successDataSet2.show(2)
+--------------+----------------+-----------+--------------+
| DEPARTMENT_ID| DEPARTMENT_NAME| GROUP_NAME|newcolumnadded|
+--------------+----------------+-----------+--------------+
|DEPARTMENT_ID1|DEPARTMENT_NAME1|GROUP_NAME1| 204|
| 1| PRESCHOOL| asdasf| 204|
+--------------+----------------+-----------+--------------+
only showing top 2 rows
scala> successDataSet2.printSchema()
root
|-- DEPARTMENT_ID: string (nullable = true)
|-- DEPARTMENT_NAME: string (nullable = true)
|-- GROUP_NAME: string (nullable = true)
|-- newcolumnadded: string (nullable = false)
scala>
| ("DEPARTMENT_ID1","DEPARTMENT_NAME1","GROUP_NAME1"),
|
| ("1","PRESCHOOL", "asdasf"),
| ("2","COSMETICS", "COSMETICS"),
| ("3","FURNITURE", "HOME")
| ).toDF("DEPARTMENT_ID","DEPARTMENT_NAME","GROUP_NAME")
dfLocal: org.apache.spark.sql.DataFrame = [DEPARTMENT_ID: string, DEPARTMENT_NAME: string ... 1 more field]
scala> dfLocal.printSchema()
root
|-- DEPARTMENT_ID: string (nullable = true)
|-- DEPARTMENT_NAME: string (nullable = true)
|-- GROUP_NAME: string (nullable = true)
scala> import org.apache.spark.sql.functions.typedLit
import org.apache.spark.sql.functions.typedLit
scala> val successDataSet2 = dfLocal.withColumn("newcolumnadded", lit("204"))
successDataSet2: org.apache.spark.sql.DataFrame = [DEPARTMENT_ID: string, DEPARTMENT_NAME: string ... 2 more fields]
scala> successDataSet2.show(2)
+--------------+----------------+-----------+--------------+
| DEPARTMENT_ID| DEPARTMENT_NAME| GROUP_NAME|newcolumnadded|
+--------------+----------------+-----------+--------------+
|DEPARTMENT_ID1|DEPARTMENT_NAME1|GROUP_NAME1| 204|
| 1| PRESCHOOL| asdasf| 204|
+--------------+----------------+-----------+--------------+
only showing top 2 rows
scala> successDataSet2.printSchema()
root
|-- DEPARTMENT_ID: string (nullable = true)
|-- DEPARTMENT_NAME: string (nullable = true)
|-- GROUP_NAME: string (nullable = true)
|-- newcolumnadded: string (nullable = false)
scala>
Sunday, October 6, 2019
De dupe in Dataframe
val df = Seq(
("d1", "2018-09-20 10:00:00", "blah1"),
("d2", "2018-09-20 09:00:00", "blah2"),
("d1", "2018-09-20 10:01:00", "blahnew")
).toDF("documentId","timestamp","anotherField")
import org.apache.spark.sql.functions.row_number
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy($"documentId").orderBy($"timestamp".desc)
val Resultdf = df.withColumn("rownum", row_number.over(w))
.where($"rownum" === 1).drop("rownum")
Resultdf.show()
input:
+----------+-------------------+------------+
|documentId| timestamp|anotherField|
+----------+-------------------+------------+
| d1|2018-09-20 10:00:00| blah1|
| d2|2018-09-20 09:00:00| blah2|
| d1|2018-09-20 10:01:00| blahnew|
+----------+-------------------+------------+
output:
+----------+-------------------+------------+
|documentId| timestamp|anotherField|
+----------+-------------------+------------+
| d2|2018-09-20 09:00:00| blah2|
| d1|2018-09-20 10:01:00| blahnew|
+----------+-------------------+------------+
("d1", "2018-09-20 10:00:00", "blah1"),
("d2", "2018-09-20 09:00:00", "blah2"),
("d1", "2018-09-20 10:01:00", "blahnew")
).toDF("documentId","timestamp","anotherField")
import org.apache.spark.sql.functions.row_number
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy($"documentId").orderBy($"timestamp".desc)
val Resultdf = df.withColumn("rownum", row_number.over(w))
.where($"rownum" === 1).drop("rownum")
Resultdf.show()
input:
+----------+-------------------+------------+
|documentId| timestamp|anotherField|
+----------+-------------------+------------+
| d1|2018-09-20 10:00:00| blah1|
| d2|2018-09-20 09:00:00| blah2|
| d1|2018-09-20 10:01:00| blahnew|
+----------+-------------------+------------+
output:
+----------+-------------------+------------+
|documentId| timestamp|anotherField|
+----------+-------------------+------------+
| d2|2018-09-20 09:00:00| blah2|
| d1|2018-09-20 10:01:00| blahnew|
+----------+-------------------+------------+
Converting string date value to Date object
scala> import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.IntegerType
scala> import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StringType
scala> import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructType
scala> import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructField
scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row
scala> import scala.collection.JavaConversions._
import scala.collection.JavaConversions._
scala> val data = Seq(("Java", 20000 , "2012-02-05"), ("Python", 100000 , "2012-02-06"), ("Scala", 3000 , "2012-02-07"))
data: Seq[(String, Int, String)] = List((Java,20000,2012-02-05), (Python,100000,2012-02-06), (Scala,3000,2012-02-07))
scala> val tableColumns = List(("language", "string"), ("people", "integer"), ("registerdate", "String"))
tableColumns: List[(String, String)] = List((language,string), (people,integer), (registerdate,String))
scala> var schema = new StructType
schema: org.apache.spark.sql.types.StructType = StructType()
scala> for (i <- p="" tablecolumns=""> | schema = schema.add(i._1, i._2)
scala> val rowData = data.map(attributes => Row(attributes._1, attributes._2 , attributes._3))
rowData: Seq[org.apache.spark.sql.Row] = List([Java,20000,2012-02-05], [Python,100000,2012-02-06], [Scala,3000,2012-02-07])
scala> var dfFromData4 = spark.createDataFrame(rowData,schema)
dfFromData4: org.apache.spark.sql.DataFrame = [language: string, people: int ... 1 more field]
scala> dfFromData4.show(2)
+--------+------+------------+
|language|people|registerdate|
+--------+------+------------+
| Java| 20000| 2012-02-05|
| Python|100000| 2012-02-06|
+--------+------+------------+
only showing top 2 rows
scala>
scala> import org.apache.spark.sql.functions.to_date
import org.apache.spark.sql.functions.to_date
scala> val dateFormat = "yyyy-dd-MM"
dateFormat: String = yyyy-dd-MM
scala> var dfFromData5 = dfFromData4.withColumn("registerdate2", to_date(col("registerdate"), dateFormat))
dfFromData5: org.apache.spark.sql.DataFrame = [language: string, people: int ... 2 more fields]
scala> dfFromData5.show(3)
+--------+------+------------+-------------+
|language|people|registerdate|registerdate2|
+--------+------+------------+-------------+
| Java| 20000| 2012-02-05| 2012-05-02|
| Python|100000| 2012-02-06| 2012-06-02|
| Scala| 3000| 2012-02-07| 2012-07-02|
+--------+------+------------+-------------+
scala> dfFromData5.printSchema()
root
|-- language: string (nullable = true)
|-- people: integer (nullable = true)
|-- registerdate: string (nullable = true)
|-- registerdate2: date (nullable = true)->
import org.apache.spark.sql.types.IntegerType
scala> import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StringType
scala> import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructType
scala> import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructField
scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row
scala> import scala.collection.JavaConversions._
import scala.collection.JavaConversions._
scala> val data = Seq(("Java", 20000 , "2012-02-05"), ("Python", 100000 , "2012-02-06"), ("Scala", 3000 , "2012-02-07"))
data: Seq[(String, Int, String)] = List((Java,20000,2012-02-05), (Python,100000,2012-02-06), (Scala,3000,2012-02-07))
scala> val tableColumns = List(("language", "string"), ("people", "integer"), ("registerdate", "String"))
tableColumns: List[(String, String)] = List((language,string), (people,integer), (registerdate,String))
scala> var schema = new StructType
schema: org.apache.spark.sql.types.StructType = StructType()
scala> for (i <- p="" tablecolumns=""> | schema = schema.add(i._1, i._2)
scala> val rowData = data.map(attributes => Row(attributes._1, attributes._2 , attributes._3))
rowData: Seq[org.apache.spark.sql.Row] = List([Java,20000,2012-02-05], [Python,100000,2012-02-06], [Scala,3000,2012-02-07])
scala> var dfFromData4 = spark.createDataFrame(rowData,schema)
dfFromData4: org.apache.spark.sql.DataFrame = [language: string, people: int ... 1 more field]
scala> dfFromData4.show(2)
+--------+------+------------+
|language|people|registerdate|
+--------+------+------------+
| Java| 20000| 2012-02-05|
| Python|100000| 2012-02-06|
+--------+------+------------+
only showing top 2 rows
scala>
scala> import org.apache.spark.sql.functions.to_date
import org.apache.spark.sql.functions.to_date
scala> val dateFormat = "yyyy-dd-MM"
dateFormat: String = yyyy-dd-MM
scala> var dfFromData5 = dfFromData4.withColumn("registerdate2", to_date(col("registerdate"), dateFormat))
dfFromData5: org.apache.spark.sql.DataFrame = [language: string, people: int ... 2 more fields]
scala> dfFromData5.show(3)
+--------+------+------------+-------------+
|language|people|registerdate|registerdate2|
+--------+------+------------+-------------+
| Java| 20000| 2012-02-05| 2012-05-02|
| Python|100000| 2012-02-06| 2012-06-02|
| Scala| 3000| 2012-02-07| 2012-07-02|
+--------+------+------------+-------------+
scala> dfFromData5.printSchema()
root
|-- language: string (nullable = true)
|-- people: integer (nullable = true)
|-- registerdate: string (nullable = true)
|-- registerdate2: date (nullable = true)->
Using Date Format in dataframe
scala> import org.apache.spark.sql.functions.to_date
import org.apache.spark.sql.functions.to_date
scala> val dateFormat = "yyyy-dd-MM"
dateFormat: String = yyyy-dd-MM
scala> val cleanDateDF = spark.range(1).select(
| to_date(lit("2017-12-11"), dateFormat).alias("date"),
| to_date(lit("2017-20-12"), dateFormat).alias("date2"))
cleanDateDF: org.apache.spark.sql.DataFrame = [date: date, date2: date]
scala> cleanDateDF.show(2)
+----------+----------+
| date| date2|
+----------+----------+
|2017-11-12|2017-12-20|
+----------+----------+
Spark the Definitive Guide is having good details about manipulating Hadoop
Converting String field to Date in Dataframe
val
sourceDF = spark.createDF(
List(
(1, "2013-01-30"),
(2, "2012-01-01")
), List(
("person_id", IntegerType, true),
("birth_date", StringType, true)
)
).withColumn(
"birth_date",
col("birth_date").cast("date")
)
List(
(1, "2013-01-30"),
(2, "2012-01-01")
), List(
("person_id", IntegerType, true),
("birth_date", StringType, true)
)
).withColumn(
"birth_date",
col("birth_date").cast("date")
)
sourceDF.show()
sourceDF.printSchema()
https://mungingdata.com/apache-spark/dates-times/
Saturday, October 5, 2019
Converting Dataframe String type to Integer
acbc32a57245:bin z002qhl$ ./spark-shell
19/10/06 12:05:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://192.168.1.2:4040
Spark context available as 'sc' (master = local[*], app id = local-1570343764090).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.1
/_/
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_191)
Type in expressions to have them evaluated.
Type :help for more information.
scala> import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.IntegerType
scala> import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StringType
scala> import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructType
scala> import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructField
scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row
scala> import scala.collection.JavaConversions._
import scala.collection.JavaConversions._
scala> val data = Seq(("Java", "20000"), ("Python", "100000"), ("Scala", "3000"))
data: Seq[(String, String)] = List((Java,20000), (Python,100000), (Scala,3000))
scala> val tableColumns = List(("order_number", "string"), ("external_order_number", "string"))
tableColumns: List[(String, String)] = List((order_number,string), (external_order_number,string))
scala> var schema = new StructType
schema: org.apache.spark.sql.types.StructType = StructType()
scala> for (i <- p="" tablecolumns=""> | schema = schema.add(i._1, i._2)
scala> val rowData = data.map(attributes => Row(attributes._1, attributes._2))
rowData: Seq[org.apache.spark.sql.Row] = List([Java,20000], [Python,100000], [Scala,3000])
scala> var dfFromData4 = spark.createDataFrame(rowData,schema)
dfFromData4: org.apache.spark.sql.DataFrame = [order_number: string, external_order_number: string]
scala> dfFromData4.show(2)
+------------+---------------------+
|order_number|external_order_number|
+------------+---------------------+
| Java| 20000|
| Python| 100000|
+------------+---------------------+
only showing top 2 rows
scala>
scala> val dfFromData5 = dfFromData4.withColumn("external_order_number", 'external_order_number cast "int")
dfFromData5: org.apache.spark.sql.DataFrame = [order_number: string, external_order_number: int]
scala> dfFromData5.printSchema
root
|-- order_number: string (nullable = true)
|-- external_order_number: integer (nullable = true)->
<- p="" tablecolumns="">
-> <- p="" tablecolumns="">
-> <- p="" tablecolumns="">
-> <- p="" tablecolumns="">
-> <- p="" tablecolumns="">---------->
<- p="" tablecolumns="">
-> import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.Row
import scala.collection.JavaConversions._
val data = Seq(("Java", "20000"), ("Python", "100000"), ("Scala", "3000"))
val tableColumns = List(("order_number", "string"), ("external_order_number", "string"))
var schema = new StructType
for (i <- p="" tablecolumns=""> schema = schema.add(i._1, i._2)
val rowData = data.map(attributes => Row(attributes._1, attributes._2))
var dfFromData4 = spark.createDataFrame(rowData,schema)
dfFromData4.show(2)
val dfFromData6 =dfFromData4.withColumn("external_order_number", dfFromData4("external_order_number").cast("int"))
<- p="" tablecolumns="">->
dfFromData6.printSchema->
19/10/06 12:05:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://192.168.1.2:4040
Spark context available as 'sc' (master = local[*], app id = local-1570343764090).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.1
/_/
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_191)
Type in expressions to have them evaluated.
Type :help for more information.
scala> import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.IntegerType
scala> import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StringType
scala> import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructType
scala> import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructField
scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row
scala> import scala.collection.JavaConversions._
import scala.collection.JavaConversions._
scala> val data = Seq(("Java", "20000"), ("Python", "100000"), ("Scala", "3000"))
data: Seq[(String, String)] = List((Java,20000), (Python,100000), (Scala,3000))
scala> val tableColumns = List(("order_number", "string"), ("external_order_number", "string"))
tableColumns: List[(String, String)] = List((order_number,string), (external_order_number,string))
scala> var schema = new StructType
schema: org.apache.spark.sql.types.StructType = StructType()
scala> for (i <- p="" tablecolumns=""> | schema = schema.add(i._1, i._2)
scala> val rowData = data.map(attributes => Row(attributes._1, attributes._2))
rowData: Seq[org.apache.spark.sql.Row] = List([Java,20000], [Python,100000], [Scala,3000])
scala> var dfFromData4 = spark.createDataFrame(rowData,schema)
dfFromData4: org.apache.spark.sql.DataFrame = [order_number: string, external_order_number: string]
scala> dfFromData4.show(2)
+------------+---------------------+
|order_number|external_order_number|
+------------+---------------------+
| Java| 20000|
| Python| 100000|
+------------+---------------------+
only showing top 2 rows
scala>
scala> val dfFromData5 = dfFromData4.withColumn("external_order_number", 'external_order_number cast "int")
dfFromData5: org.apache.spark.sql.DataFrame = [order_number: string, external_order_number: int]
scala> dfFromData5.printSchema
root
|-- order_number: string (nullable = true)
|-- external_order_number: integer (nullable = true)->
<- p="" tablecolumns="">
-> <- p="" tablecolumns="">
-> <- p="" tablecolumns="">
-> <- p="" tablecolumns="">
-> <- p="" tablecolumns="">---------->
<- p="" tablecolumns="">
-> import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.Row
import scala.collection.JavaConversions._
val data = Seq(("Java", "20000"), ("Python", "100000"), ("Scala", "3000"))
val tableColumns = List(("order_number", "string"), ("external_order_number", "string"))
var schema = new StructType
for (i <- p="" tablecolumns=""> schema = schema.add(i._1, i._2)
val rowData = data.map(attributes => Row(attributes._1, attributes._2))
var dfFromData4 = spark.createDataFrame(rowData,schema)
dfFromData4.show(2)
val dfFromData6 =dfFromData4.withColumn("external_order_number", dfFromData4("external_order_number").cast("int"))
<- p="" tablecolumns="">->
dfFromData6.printSchema->
Message to RDD to Dataframe example
import spark.implicits._
val columns = Seq("language","users_count")
val data = Seq(("Java", "20000"), ("Python", "100000"), ("Scala", "3000"))
val rdd = spark.sparkContext.parallelize(data)
val dfFromRDD1 = rdd.toDF()
dfFromRDD1.printSchema()
val dfFromRDD1 = rdd.toDF("language","users_count")
dfFromRDD1.printSchema()
val dfFromRDD2 = spark.createDataFrame(rdd).toDF(columns:_*)
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.Row
val schema = StructType(columns
.map(fieldName => StructField(fieldName, StringType)))
val rowRDD = rdd.map(attributes => Row(attributes._1, attributes._2))
val dfFromRDD3 = spark.createDataFrame(rowRDD,schema)
import spark.implicits._
val dfFromData1 = data.toDF()
import scala.collection.JavaConversions._
val rowData = data.map(attributes => Row(attributes._1, attributes._2))
var dfFromData4 = spark.createDataFrame(rowData,schema)
val columns = Seq("language","users_count")
val data = Seq(("Java", "20000"), ("Python", "100000"), ("Scala", "3000"))
val rdd = spark.sparkContext.parallelize(data)
val dfFromRDD1 = rdd.toDF()
dfFromRDD1.printSchema()
val dfFromRDD1 = rdd.toDF("language","users_count")
dfFromRDD1.printSchema()
val dfFromRDD2 = spark.createDataFrame(rdd).toDF(columns:_*)
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.Row
val schema = StructType(columns
.map(fieldName => StructField(fieldName, StringType)))
val rowRDD = rdd.map(attributes => Row(attributes._1, attributes._2))
val dfFromRDD3 = spark.createDataFrame(rowRDD,schema)
import spark.implicits._
val dfFromData1 = data.toDF()
import scala.collection.JavaConversions._
val rowData = data.map(attributes => Row(attributes._1, attributes._2))
var dfFromData4 = spark.createDataFrame(rowData,schema)
Message to Dataframe using Schema
scala> acbc32a57245:bin z002qhl$ ./spark-shell
19/10/06 11:40:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://192.168.1.2:4040
Spark context available as 'sc' (master = local[*], app id = local-1570342289541).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.1
/_/
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_191)
Type in expressions to have them evaluated.
Type :help for more information.
scala> import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.IntegerType
scala> import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StringType
scala> import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructType
scala> import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructField
scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row
scala> import scala.collection.JavaConversions._
import scala.collection.JavaConversions._
scala> val data = Seq(("Java", 20000), ("Python", 100000), ("Scala", 3000))
data: Seq[(String, Int)] = List((Java,20000), (Python,100000), (Scala,3000))
scala> val tableColumns = List(("order_number", "string"), ("external_order_number", "integer"))
tableColumns: List[(String, String)] = List((order_number,string), (external_order_number,integer))
scala> var schema = new StructType
schema: org.apache.spark.sql.types.StructType = StructType()
scala> for (i <- p="" tablecolumns=""> | schema = schema.add(i._1, i._2)
scala> val rowData = data.map(attributes => Row(attributes._1, attributes._2))
rowData: Seq[org.apache.spark.sql.Row] = List([Java,20000], [Python,100000], [Scala,3000])
scala> var dfFromData4 = spark.createDataFrame(rowData,schema)
dfFromData4: org.apache.spark.sql.DataFrame = [order_number: string, external_order_number: int]
scala> dfFromData4.show(2)
+------------+---------------------+
|order_number|external_order_number|
+------------+---------------------+
| Java| 20000|
| Python| 100000|
+------------+---------------------+
only showing top 2 rows
->
19/10/06 11:40:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://192.168.1.2:4040
Spark context available as 'sc' (master = local[*], app id = local-1570342289541).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.1
/_/
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_191)
Type in expressions to have them evaluated.
Type :help for more information.
scala> import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.IntegerType
scala> import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StringType
scala> import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructType
scala> import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructField
scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row
scala> import scala.collection.JavaConversions._
import scala.collection.JavaConversions._
scala> val data = Seq(("Java", 20000), ("Python", 100000), ("Scala", 3000))
data: Seq[(String, Int)] = List((Java,20000), (Python,100000), (Scala,3000))
scala> val tableColumns = List(("order_number", "string"), ("external_order_number", "integer"))
tableColumns: List[(String, String)] = List((order_number,string), (external_order_number,integer))
scala> var schema = new StructType
schema: org.apache.spark.sql.types.StructType = StructType()
scala> for (i <- p="" tablecolumns=""> | schema = schema.add(i._1, i._2)
scala> val rowData = data.map(attributes => Row(attributes._1, attributes._2))
rowData: Seq[org.apache.spark.sql.Row] = List([Java,20000], [Python,100000], [Scala,3000])
scala> var dfFromData4 = spark.createDataFrame(rowData,schema)
dfFromData4: org.apache.spark.sql.DataFrame = [order_number: string, external_order_number: int]
scala> dfFromData4.show(2)
+------------+---------------------+
|order_number|external_order_number|
+------------+---------------------+
| Java| 20000|
| Python| 100000|
+------------+---------------------+
only showing top 2 rows
creating dataframe in spark shell
acbc32a57245:bin z002qhl$ pwd
/Users/basan/Documents/Spark/spark-2.4.1-bin-hadoop2.7/bin
acbc32a57245:bin z002qhl$ ./spark-s
spark-shell spark-sql spark-submit
acbc32a57245:bin basan$ ./spark-shell
19/10/06 08:43:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://192.168.1.4:4040
Spark context available as 'sc' (master = local[*], app id = local-1570331637922).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.1
/_/
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_191)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val data = List(("James ","","Smith","36636","M",60000),
| ("Michael ","Rose","","40288","M",70000),
| ("Robert ","","Williams","42114","",400000),
| ("Maria ","Anne","Jones","39192","F",500000),
| ("Jen","Mary","Brown","","F",0))
data: List[(String, String, String, String, String, Int)] = List(("James ","",Smith,36636,M,60000), ("Michael ",Rose,"",40288,M,70000), ("Robert ","",Williams,42114,"",400000), ("Maria ",Anne,Jones,39192,F,500000), (Jen,Mary,Brown,"",F,0))
scala> val cols = Seq("first_name","middle_name","last_name","dob","gender","salary")
cols: Seq[String] = List(first_name, middle_name, last_name, dob, gender, salary)
scala> val df = spark.createDataFrame(data).toDF(cols:_*)
df: org.apache.spark.sql.DataFrame = [first_name: string, middle_name: string ... 4 more fields]
scala> df.show(2)
+----------+-----------+---------+-----+------+------+
|first_name|middle_name|last_name| dob|gender|salary|
+----------+-----------+---------+-----+------+------+
| James | | Smith|36636| M| 60000|
| Michael | Rose| |40288| M| 70000|
+----------+-----------+---------+-----+------+------+
only showing top 2 rows
scala> val df4 = df.select(col("*"),
| expr("case when gender = 'M' then 'Male' " +
| "when gender = 'F' then 'Female' " +
| "else 'Unknown' end").alias("new_gender"))
df4: org.apache.spark.sql.DataFrame = [first_name: string, middle_name: string ... 5 more fields]
scala> df4.show(2)
+----------+-----------+---------+-----+------+------+----------+
|first_name|middle_name|last_name| dob|gender|salary|new_gender|
+----------+-----------+---------+-----+------+------+----------+
| James | | Smith|36636| M| 60000| Male|
| Michael | Rose| |40288| M| 70000| Male|
+----------+-----------+---------+-----+------+------+----------+
only showing top 2 rows
scala> val df3 = df.withColumn("new_gender2",
| expr("case when gender = 'M' then 'Male' " +
| "when gender = 'F' then 'Female' " +
| "else 'Unknown' end"))
df3: org.apache.spark.sql.DataFrame = [first_name: string, middle_name: string ... 5 more fields]
scala> df3.show(2)
+----------+-----------+---------+-----+------+------+-----------+
|first_name|middle_name|last_name| dob|gender|salary|new_gender2|
+----------+-----------+---------+-----+------+------+-----------+
| James | | Smith|36636| M| 60000| Male|
| Michael | Rose| |40288| M| 70000| Male|
+----------+-----------+---------+-----+------+------+-----------+
only showing top 2 rows
scala>
scala> val df3 = df.withColumn("geneder",
| expr("case when gender = 'M' then 'Male' " +
| "when gender = 'F' then 'Female' " +
| "else 'Unknown' end"))
df3: org.apache.spark.sql.DataFrame = [first_name: string, middle_name: string ... 5 more fields]
scala> df3.show(2)
+----------+-----------+---------+-----+------+------+-------+
|first_name|middle_name|last_name| dob|gender|salary|geneder|
+----------+-----------+---------+-----+------+------+-------+
| James | | Smith|36636| M| 60000| Male|
| Michael | Rose| |40288| M| 70000| Male|
+----------+-----------+---------+-----+------+------+-------+
only showing top 2 rows
scala>
scala> val df3 = df.withColumn("gender",
| expr("case when gender = 'M' then 'Male' " +
| "when gender = 'F' then 'Female' " +
| "else 'Unknown' end"))
df3: org.apache.spark.sql.DataFrame = [first_name: string, middle_name: string ... 4 more fields]
scala> df3.show(2)
+----------+-----------+---------+-----+------+------+
|first_name|middle_name|last_name| dob|gender|salary|
+----------+-----------+---------+-----+------+------+
| James | | Smith|36636| Male| 60000|
| Michael | Rose| |40288| Male| 70000|
+----------+-----------+---------+-----+------+------+
only showing top 2 rows
/Users/basan/Documents/Spark/spark-2.4.1-bin-hadoop2.7/bin
acbc32a57245:bin z002qhl$ ./spark-s
spark-shell spark-sql spark-submit
acbc32a57245:bin basan$ ./spark-shell
19/10/06 08:43:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://192.168.1.4:4040
Spark context available as 'sc' (master = local[*], app id = local-1570331637922).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.1
/_/
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_191)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val data = List(("James ","","Smith","36636","M",60000),
| ("Michael ","Rose","","40288","M",70000),
| ("Robert ","","Williams","42114","",400000),
| ("Maria ","Anne","Jones","39192","F",500000),
| ("Jen","Mary","Brown","","F",0))
data: List[(String, String, String, String, String, Int)] = List(("James ","",Smith,36636,M,60000), ("Michael ",Rose,"",40288,M,70000), ("Robert ","",Williams,42114,"",400000), ("Maria ",Anne,Jones,39192,F,500000), (Jen,Mary,Brown,"",F,0))
scala> val cols = Seq("first_name","middle_name","last_name","dob","gender","salary")
cols: Seq[String] = List(first_name, middle_name, last_name, dob, gender, salary)
scala> val df = spark.createDataFrame(data).toDF(cols:_*)
df: org.apache.spark.sql.DataFrame = [first_name: string, middle_name: string ... 4 more fields]
scala> df.show(2)
+----------+-----------+---------+-----+------+------+
|first_name|middle_name|last_name| dob|gender|salary|
+----------+-----------+---------+-----+------+------+
| James | | Smith|36636| M| 60000|
| Michael | Rose| |40288| M| 70000|
+----------+-----------+---------+-----+------+------+
only showing top 2 rows
scala> val df4 = df.select(col("*"),
| expr("case when gender = 'M' then 'Male' " +
| "when gender = 'F' then 'Female' " +
| "else 'Unknown' end").alias("new_gender"))
df4: org.apache.spark.sql.DataFrame = [first_name: string, middle_name: string ... 5 more fields]
scala> df4.show(2)
+----------+-----------+---------+-----+------+------+----------+
|first_name|middle_name|last_name| dob|gender|salary|new_gender|
+----------+-----------+---------+-----+------+------+----------+
| James | | Smith|36636| M| 60000| Male|
| Michael | Rose| |40288| M| 70000| Male|
+----------+-----------+---------+-----+------+------+----------+
only showing top 2 rows
scala> val df3 = df.withColumn("new_gender2",
| expr("case when gender = 'M' then 'Male' " +
| "when gender = 'F' then 'Female' " +
| "else 'Unknown' end"))
df3: org.apache.spark.sql.DataFrame = [first_name: string, middle_name: string ... 5 more fields]
scala> df3.show(2)
+----------+-----------+---------+-----+------+------+-----------+
|first_name|middle_name|last_name| dob|gender|salary|new_gender2|
+----------+-----------+---------+-----+------+------+-----------+
| James | | Smith|36636| M| 60000| Male|
| Michael | Rose| |40288| M| 70000| Male|
+----------+-----------+---------+-----+------+------+-----------+
only showing top 2 rows
scala>
scala> val df3 = df.withColumn("geneder",
| expr("case when gender = 'M' then 'Male' " +
| "when gender = 'F' then 'Female' " +
| "else 'Unknown' end"))
df3: org.apache.spark.sql.DataFrame = [first_name: string, middle_name: string ... 5 more fields]
scala> df3.show(2)
+----------+-----------+---------+-----+------+------+-------+
|first_name|middle_name|last_name| dob|gender|salary|geneder|
+----------+-----------+---------+-----+------+------+-------+
| James | | Smith|36636| M| 60000| Male|
| Michael | Rose| |40288| M| 70000| Male|
+----------+-----------+---------+-----+------+------+-------+
only showing top 2 rows
scala>
scala> val df3 = df.withColumn("gender",
| expr("case when gender = 'M' then 'Male' " +
| "when gender = 'F' then 'Female' " +
| "else 'Unknown' end"))
df3: org.apache.spark.sql.DataFrame = [first_name: string, middle_name: string ... 4 more fields]
scala> df3.show(2)
+----------+-----------+---------+-----+------+------+
|first_name|middle_name|last_name| dob|gender|salary|
+----------+-----------+---------+-----+------+------+
| James | | Smith|36636| Male| 60000|
| Michael | Rose| |40288| Male| 70000|
+----------+-----------+---------+-----+------+------+
only showing top 2 rows
Wednesday, October 2, 2019
Add jar to spark classpath
scala> :require /Users/basan/.ivy2/cache/org.apache.kafka/kafka-clients/jars/kafka-clients-0.10.2.2.jar
Added '/Users/basan/.ivy2/cache/org.apache.kafka/kafka-clients/jars/kafka-clients-0.10.2.2.jar' to classpath.
scala> import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecord
scala> val record1 : ConsumerRecord[String, String] = new ConsumerRecord("topic1",0,100,"Key1", "value1")
record1: org.apache.kafka.clients.consumer.ConsumerRecord[String,String] = ConsumerRecord(topic = topic1, partition = 0, offset = 100, NoTimestampType = -1, checksum = -1, serialized key size = -1, serialized value size = -1, key = Key1, value = value1)
scala> val record2 : ConsumerRecord[String, String] = new ConsumerRecord("topic1",0,100,"Key2", "value2")
record2: org.apache.kafka.clients.consumer.ConsumerRecord[String,String] = ConsumerRecord(topic = topic1, partition = 0, offset = 100, NoTimestampType = -1, checksum = -1, serialized key size = -1, serialized value size = -1, key = Key2, value = value2)
scala> record1.value()
res0: String = value1
Added '/Users/basan/.ivy2/cache/org.apache.kafka/kafka-clients/jars/kafka-clients-0.10.2.2.jar' to classpath.
scala> import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecord
scala> val record1 : ConsumerRecord[String, String] = new ConsumerRecord("topic1",0,100,"Key1", "value1")
record1: org.apache.kafka.clients.consumer.ConsumerRecord[String,String] = ConsumerRecord(topic = topic1, partition = 0, offset = 100, NoTimestampType = -1, checksum = -1, serialized key size = -1, serialized value size = -1, key = Key1, value = value1)
scala> val record2 : ConsumerRecord[String, String] = new ConsumerRecord("topic1",0,100,"Key2", "value2")
record2: org.apache.kafka.clients.consumer.ConsumerRecord[String,String] = ConsumerRecord(topic = topic1, partition = 0, offset = 100, NoTimestampType = -1, checksum = -1, serialized key size = -1, serialized value size = -1, key = Key2, value = value2)
scala> record1.value()
res0: String = value1
Subscribe to:
Posts (Atom)