Powered By Blogger

Saturday, October 5, 2019

Message to Dataframe using Schema

scala> acbc32a57245:bin z002qhl$ ./spark-shell
19/10/06 11:40:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://192.168.1.2:4040
Spark context available as 'sc' (master = local[*], app id = local-1570342289541).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.1
      /_/

Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_191)
Type in expressions to have them evaluated.
Type :help for more information.

scala> import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.IntegerType

scala> import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StringType

scala> import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructType

scala> import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructField

scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala> import scala.collection.JavaConversions._
import scala.collection.JavaConversions._

scala>     val data = Seq(("Java", 20000), ("Python", 100000), ("Scala", 3000))
data: Seq[(String, Int)] = List((Java,20000), (Python,100000), (Scala,3000))

scala>     val tableColumns = List(("order_number", "string"), ("external_order_number", "integer"))
tableColumns: List[(String, String)] = List((order_number,string), (external_order_number,integer))

scala>     var schema = new StructType
schema: org.apache.spark.sql.types.StructType = StructType()

scala>     for (i <- p="" tablecolumns="">     |       schema = schema.add(i._1, i._2)

scala>       val rowData = data.map(attributes => Row(attributes._1, attributes._2))
rowData: Seq[org.apache.spark.sql.Row] = List([Java,20000], [Python,100000], [Scala,3000])

scala>       var dfFromData4 = spark.createDataFrame(rowData,schema)
dfFromData4: org.apache.spark.sql.DataFrame = [order_number: string, external_order_number: int]

scala>       dfFromData4.show(2)
+------------+---------------------+
|order_number|external_order_number|
+------------+---------------------+
|        Java|                20000|
|      Python|               100000|
+------------+---------------------+
only showing top 2 rows

No comments:

Post a Comment