import java.util.UUID import com.typesafe.config.Config import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.{SaveMode, _} import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.functions.litimport org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges} import org.apache.spark.sql.functions._ import scala.collection.mutable import scala.io.Source import scala.tools.fusesource_embedded.jansi.AnsiConsole /** * creates the dataset by passing RDD in various steps * */object DataHandlerService extends DataHandlerTrait { def loadResource(filename: String) = { val path = getClass.getResource(filename) // Source.fromInputStream(getClass.getResourceAsStream("/war-and-peace.txt")).getLines().foreach(println) val source = Source.fromURL(path) try source.mkString finally source.close() } def processMessages(messages: InputDStream[ConsumerRecord[String, String]], sparkSession: SparkSession, errorDirPath: String, successPath: String, kafkaSink: Broadcast[KafkaSinkService], brValidators: Broadcast[mutable.Map[String, Array[Validator]]], brDefaultValues: Broadcast[mutable.Map[String, Any]], brTransFormerMap: Broadcast[mutable.Map[String, AttributeTransformer]] , brConfig: Broadcast[Config]): Unit = { var schema = SchemaService.getSchemaDefinition() val encoder = RowEncoder.apply(schema) val fileContents = loadResource("/test-schema") print(fileContents) val schema3 = DataType.fromJson(fileContents).asInstanceOf[StructType] print(schema3) messages.foreachRDD(rdd => { import sparkSession.implicits._ val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges if (!rdd.isEmpty() && rdd.count() > 0) { val dataSet : Dataset[String] = sparkSession.createDataset(rdd.map(x => x.value())) print(dataSet.show(2)) //perform the logic on the rdd var streamData = sparkSession.read.option("mode", "PERMISSIVE").option("columnNameOfCorruptRecord", "corrupt_record").schema(schema3).json(dataSet) // val streamData = sparkSession.read.schema(schema3).json(dataSet) // streamData.write.json("/Users/z002qhl/Documents/Spark/SparkProjects/sparkStreaming/table") // streamData.write.mode("append").option("compression","snappy").format("orc").saveAsTable("z002qhl.testtable") print("schema::" + streamData.printSchema()) //streamData.foreach( mes => println("msg:" + mes.mkString)) val generateUUID = udf(() => UUID.randomUUID().toString) // val ts = to_timestamp($"dts", "MM/dd/yyyy HH:mm:ss") // streamData.withColumn("ts", ts) streamData = streamData.withColumn("decorated",lit(1) ).withColumn("current_date", current_date()) .withColumn("unix_timestamp", unix_timestamp()) .withColumn("Next_processing_date", date_add(current_date(), 10)) .withColumn("uuid",generateUUID()) .withColumn("concatedrows", concat('action,'atp_tracking_id)) .withColumn("casting", $"action".substr(0,3)) print(streamData.show(5)) } messages.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) })
Tuesday, October 29, 2019
Read message and column transformations
Subscribe to:
Post Comments (Atom)
No comments:
Post a Comment