Powered By Blogger

Monday, May 3, 2021

Write the json file to hdfs with spark

 

Pass the valid hdfs path. It will create the single file in overwrite mode

@throws[WriterException]
def writeFileToHDFS(fileLocation: String, fileBody : String)(sparkSession: SparkSession) = {
logInfo(s"Write the contents to the file location $fileLocation")
var fsDataOutputStream : FSDataOutputStream = null
try {
val hadoopConf: Configuration = sparkSession.sparkContext.hadoopConfiguration
val hdfs: FileSystem = org.apache.hadoop.fs.FileSystem.get(hadoopConf)
val exportToFile = new Path(fileLocation)

fsDataOutputStream = hdfs.create(exportToFile)
fsDataOutputStream.write(fileBody.getBytes)
fsDataOutputStream.hflush()
}catch {
case ex : Exception => logError(s"Error in writing to the file $fileLocation", ex)
throw WriterException(s"Error in writing to the file $fileLocation")
}finally {
logInfo("Closing the writer object")
if(fsDataOutputStream != null){
fsDataOutputStream.close()
}
}
}



If we write the dataframe it will create the folder and within that it will write the file

def writeFileToHDFSUsingDataFrame(fileLocation: String, fileBody : String)(sparkSession: SparkSession): Unit ={
import sparkSession.implicits._
Seq(fileBody)
.toDF().coalesce(1)
.write
.mode("overwrite").text(fileLocation)
}

Spark Read content of the file as is

 json file


{
"sharedObjectName": "sharedObjectName",
"writeInProgress":false,
"tableName":"item",
"lastSyncTimeInEpoch":1619783310,
"latestLocation":"latestLocation/latestLocation/latestLocation",
"oneStepLagLocation":"oneStepLagLocation/oneStepLagLocation/oneStepLagLocation"
}



Spark Code to read the file

@throws[ConfigException]
def readJSONFileAsIs( location: String)(sparkSession:SparkSession): String ={
logInfo(s"Read the file contents as is from the location $location")
try {
val textFileDataFrame : Dataset[String] = sparkSession.read.textFile(location)
val builder : StringBuilder = new StringBuilder()
textFileDataFrame.collect().foreach(line=>{
builder.append(line)
})
val fileContents : String = builder.toString()
logInfo(s"Content of the file $fileContents")
fileContents

}catch {
case exception: Exception => logError(s"Error in reading the file from the location $location",exception)
throw ConfigException(s"Error in reading the file from the location $location")
}

}