Powered By Blogger

Tuesday, October 29, 2019

Read message and column transformations

import java.util.UUID


import com.typesafe.config.Config
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SaveMode, _}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.functions.litimport org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges}
import org.apache.spark.sql.functions._

import scala.collection.mutable
import scala.io.Source
import scala.tools.fusesource_embedded.jansi.AnsiConsole

/**  * creates the dataset by passing RDD in various steps  *  */object DataHandlerService extends DataHandlerTrait {



  def loadResource(filename: String) = {
    val path = getClass.getResource(filename)
    // Source.fromInputStream(getClass.getResourceAsStream("/war-and-peace.txt")).getLines().foreach(println)
    val source = Source.fromURL(path)
    try source.mkString finally source.close()
  }

  def processMessages(messages: InputDStream[ConsumerRecord[String, String]], sparkSession: SparkSession, errorDirPath: String,
                      successPath: String, kafkaSink: Broadcast[KafkaSinkService], brValidators: Broadcast[mutable.Map[String, Array[Validator]]],
                      brDefaultValues: Broadcast[mutable.Map[String, Any]], brTransFormerMap: Broadcast[mutable.Map[String, AttributeTransformer]]
                      , brConfig: Broadcast[Config]): Unit = {

    var schema = SchemaService.getSchemaDefinition()
    val encoder = RowEncoder.apply(schema)



    val fileContents = loadResource("/test-schema")

    print(fileContents)
    val schema3 = DataType.fromJson(fileContents).asInstanceOf[StructType]
    print(schema3)


    messages.foreachRDD(rdd => {
      import sparkSession.implicits._
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      if (!rdd.isEmpty() && rdd.count() > 0) {

       val dataSet : Dataset[String] = sparkSession.createDataset(rdd.map(x => x.value()))
        print(dataSet.show(2))
        //perform the logic on the rdd       var streamData = sparkSession.read.option("mode", "PERMISSIVE").option("columnNameOfCorruptRecord", "corrupt_record").schema(schema3).json(dataSet)
       // val streamData = sparkSession.read.schema(schema3).json(dataSet)
        //  streamData.write.json("/Users/z002qhl/Documents/Spark/SparkProjects/sparkStreaming/table")       // streamData.write.mode("append").option("compression","snappy").format("orc").saveAsTable("z002qhl.testtable")
        print("schema::" + streamData.printSchema())

        //streamData.foreach( mes =>  println("msg:" + mes.mkString))
        val generateUUID = udf(() => UUID.randomUUID().toString)


       // val ts = to_timestamp($"dts", "MM/dd/yyyy HH:mm:ss")       // streamData.withColumn("ts", ts)
        streamData = streamData.withColumn("decorated",lit(1) ).withColumn("current_date", current_date())
            .withColumn("unix_timestamp", unix_timestamp())
        .withColumn("Next_processing_date", date_add(current_date(), 10))
            .withColumn("uuid",generateUUID())
            .withColumn("concatedrows", concat('action,'atp_tracking_id))
          .withColumn("casting", $"action".substr(0,3))





            print(streamData.show(5))

      }
      messages.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)


    })

No comments:

Post a Comment