Powered By Blogger

Tuesday, October 29, 2019

Kafka message validation

val fileContents = loadResource("/input-schema")

print(fileContents)
val schema3 = DataType.fromJson(fileContents).asInstanceOf[StructType]
print(schema3)


messages.foreachRDD(rdd => {
  import sparkSession.implicits._
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  if (!rdd.isEmpty() && rdd.count() > 0) {

   val dataSet : Dataset[String] = sparkSession.createDataset(rdd.map(x => x.value()))
    print(dataSet.show(2))
    //perform the logic on the rdd   val streamData = sparkSession.read.option("mode", "PERMISSIVE").option("columnNameOfCorruptRecord", "corrupt_record").schema(schema3).json(dataSet)
   // val streamData = sparkSession.read.schema(schema3).json(dataSet)    print(streamData.show(5))
    //  streamData.write.json("/Users/z002qhl/Documents/Spark/SparkProjects/sparkStreaming/table")   // streamData.write.mode("append").option("compression","snappy").format("orc").saveAsTable("z002qhl.testtable")
    print("schema::" + streamData.printSchema())


    streamData.map(mes => mes.)


  }
  messages.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)


})


def loadResource(filename: String) = {
  val path = getClass.getResource(filename)
  // Source.fromInputStream(getClass.getResourceAsStream("/war-and-peace.txt")).getLines().foreach(println)
  val source = Source.fromURL(path)
  try source.mkString finally source.close()
}



{
  "type": "struct",
  "fields": [
    {
      "name": "action",
      "type": "string",
      "nullable": true,
      "metadata": {

      }
    },
    {
      "name": "tracking_id",
      "type": "string",
      "nullable": true,
      "metadata": {

      }
    },
    {
          "name": "corrupt_record",
          "type": "string",
          "nullable": true,
          "metadata": {

          }
        }


  ]
}

No comments:

Post a Comment