val fileContents = loadResource("/input-schema")
print(fileContents)
val schema3 = DataType.fromJson(fileContents).asInstanceOf[StructType]
print(schema3)
messages.foreachRDD(rdd => {
import sparkSession.implicits._
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
if (!rdd.isEmpty() && rdd.count() > 0) {
val dataSet : Dataset[String] = sparkSession.createDataset(rdd.map(x => x.value()))
print(dataSet.show(2))
//perform the logic on the rdd val streamData = sparkSession.read.option("mode", "PERMISSIVE").option("columnNameOfCorruptRecord", "corrupt_record").schema(schema3).json(dataSet)
// val streamData = sparkSession.read.schema(schema3).json(dataSet) print(streamData.show(5))
// streamData.write.json("/Users/z002qhl/Documents/Spark/SparkProjects/sparkStreaming/table") // streamData.write.mode("append").option("compression","snappy").format("orc").saveAsTable("z002qhl.testtable")
print("schema::" + streamData.printSchema())
streamData.map(mes => mes.)
}
messages.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
})
def loadResource(filename: String) = {
val path = getClass.getResource(filename)
// Source.fromInputStream(getClass.getResourceAsStream("/war-and-peace.txt")).getLines().foreach(println)
val source = Source.fromURL(path)
try source.mkString finally source.close()
}
{
"type": "struct",
"fields": [
{
"name": "action",
"type": "string",
"nullable": true,
"metadata": {
}
},
{
"name": "tracking_id",
"type": "string",
"nullable": true,
"metadata": {
}
},
{
"name": "corrupt_record",
"type": "string",
"nullable": true,
"metadata": {
}
}
]
}
No comments:
Post a Comment