val fileContents = loadResource("/input-schema") print(fileContents) val schema3 = DataType.fromJson(fileContents).asInstanceOf[StructType] print(schema3) messages.foreachRDD(rdd => { import sparkSession.implicits._ val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges if (!rdd.isEmpty() && rdd.count() > 0) { val dataSet : Dataset[String] = sparkSession.createDataset(rdd.map(x => x.value())) print(dataSet.show(2)) //perform the logic on the rdd val streamData = sparkSession.read.option("mode", "PERMISSIVE").option("columnNameOfCorruptRecord", "corrupt_record").schema(schema3).json(dataSet) // val streamData = sparkSession.read.schema(schema3).json(dataSet) print(streamData.show(5)) // streamData.write.json("/Users/z002qhl/Documents/Spark/SparkProjects/sparkStreaming/table") // streamData.write.mode("append").option("compression","snappy").format("orc").saveAsTable("z002qhl.testtable") print("schema::" + streamData.printSchema()) streamData.map(mes => mes.) } messages.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) })
def loadResource(filename: String) = { val path = getClass.getResource(filename) // Source.fromInputStream(getClass.getResourceAsStream("/war-and-peace.txt")).getLines().foreach(println) val source = Source.fromURL(path) try source.mkString finally source.close() }{ "type": "struct", "fields": [ { "name": "action", "type": "string", "nullable": true, "metadata": { } }, { "name": "tracking_id", "type": "string", "nullable": true, "metadata": { } }, { "name": "corrupt_record", "type": "string", "nullable": true, "metadata": { } } ] }

No comments:
Post a Comment