Powered By Blogger

Monday, May 3, 2021

Write the json file to hdfs with spark

 

Pass the valid hdfs path. It will create the single file in overwrite mode

@throws[WriterException]
def writeFileToHDFS(fileLocation: String, fileBody : String)(sparkSession: SparkSession) = {
logInfo(s"Write the contents to the file location $fileLocation")
var fsDataOutputStream : FSDataOutputStream = null
try {
val hadoopConf: Configuration = sparkSession.sparkContext.hadoopConfiguration
val hdfs: FileSystem = org.apache.hadoop.fs.FileSystem.get(hadoopConf)
val exportToFile = new Path(fileLocation)

fsDataOutputStream = hdfs.create(exportToFile)
fsDataOutputStream.write(fileBody.getBytes)
fsDataOutputStream.hflush()
}catch {
case ex : Exception => logError(s"Error in writing to the file $fileLocation", ex)
throw WriterException(s"Error in writing to the file $fileLocation")
}finally {
logInfo("Closing the writer object")
if(fsDataOutputStream != null){
fsDataOutputStream.close()
}
}
}



If we write the dataframe it will create the folder and within that it will write the file

def writeFileToHDFSUsingDataFrame(fileLocation: String, fileBody : String)(sparkSession: SparkSession): Unit ={
import sparkSession.implicits._
Seq(fileBody)
.toDF().coalesce(1)
.write
.mode("overwrite").text(fileLocation)
}

Spark Read content of the file as is

 json file


{
"sharedObjectName": "sharedObjectName",
"writeInProgress":false,
"tableName":"item",
"lastSyncTimeInEpoch":1619783310,
"latestLocation":"latestLocation/latestLocation/latestLocation",
"oneStepLagLocation":"oneStepLagLocation/oneStepLagLocation/oneStepLagLocation"
}



Spark Code to read the file

@throws[ConfigException]
def readJSONFileAsIs( location: String)(sparkSession:SparkSession): String ={
logInfo(s"Read the file contents as is from the location $location")
try {
val textFileDataFrame : Dataset[String] = sparkSession.read.textFile(location)
val builder : StringBuilder = new StringBuilder()
textFileDataFrame.collect().foreach(line=>{
builder.append(line)
})
val fileContents : String = builder.toString()
logInfo(s"Content of the file $fileContents")
fileContents

}catch {
case exception: Exception => logError(s"Error in reading the file from the location $location",exception)
throw ConfigException(s"Error in reading the file from the location $location")
}

}

Friday, April 30, 2021

Convert json to Object in scala using jackson


{
"sharedObjectName": "sharedObjectName",
"writeInProgress":false,
"tableName":"item",
"lastSyncTimeInEpoch":1619783310,
"latestLocation":"latestLocation/latestLocation/latestLocation",
"oneStepLagLocation":"oneStepLagLocation/oneStepLagLocation/oneStepLagLocation"
}


POJO class

case class SharedObject(
var sharedObjectName: String,
var writeInProgress: Boolean = false,
var tableName: String = null,
var lastSyncTimeInEpoch: Long = 0,
var latestLocation: String = null,
var oneStepLagLocation: String = null)


Code to convert the class

import scala.io.Source

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule

def convertJSONToSharedObject( inputMessage : String ): SharedObject ={
log.info(s"Input message to be converted is $inputMessage")
try {
val mapper :ObjectMapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
val parsed : SharedObject = mapper.readValue(inputMessage, classOf[SharedObject])
parsed
}catch {
case e : Exception =>
throw ConfigException(s"Message $inputMessage is not parsable")
}
}

Saturday, January 23, 2021

SBT hacks


Skipping test cases in build

  sbt 'set test in assembly := {}' clean assembly

  sbt 'set test in assembly := {}' clean package


Referring to the local jar in sbt

unmanagedJars in Compile += file("/Users/basan/Documents/Projects/application/application-core/target/scala-2.11/application-core-assembly-1.8.2.jar")

https://www.scala-sbt.org/1.x/docs/Library-Management.html

unmanagedBase := baseDirectory.value / "custom_lib"
https://github.com/sbt/sbt-assembly/blob/master/src/sbt-test/sbt-assembly/deps/build.sbt


lazy val root = (project in file(".")).
settings(
version := "0.1",
scalaVersion := "2.11.12",
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.1" % "test",
libraryDependencies += "ch.qos.logback" % "logback-classic" % "0.9.29" % "runtime",
unmanagedJars in Compile ++= {
(baseDirectory.value / "lib" / "compile" ** "*.jar").classpath
},
unmanagedJars in Runtime ++= {
(baseDirectory.value / "lib" / "runtime" ** "*.jar").classpath
},
unmanagedJars in Test ++= {
(baseDirectory.value / "lib" / "test" ** "*.jar").classpath
},
assemblyExcludedJars in assembly := {
(fullClasspath in assembly).value filter {_.data.getName == "compile-0.1.0.jar"}
},
assemblyJarName in assembly := "foo.jar",
TaskKey[Unit]("check") := {
val process = sys.process.Process("java", Seq("-jar", (crossTarget.value / "foo.jar").toString))
val out = (process!!)
if (out.trim != "hello") sys.error("unexpected output: " + out)
()
}
)

Sunday, January 10, 2021

Typesafe config map



app {
#application name
name: "Streaming"

dataportal {
timelinessKafka {
topicName = "kelsa-DIP-replace"
producerDetails {
bootstrap.servers = "broker:9093"
client.id = "application name"
key.serializer="org.apache.kafka.common.serialization.StringSerializer"
value.serializer="org.apache.kafka.common.serialization.StringSerializer"
ssl.truststore.location = "Data/certificates/metakafka/client.truststore_stg.jks"
ssl.keystore.location = "Data/certificates/metakafka/server.keystore_stg.jks"
security.protocol = "SSL"
ssl.protocol = "TLSv1.2"
ssl.endpoint.identification.algorithm = ""
}
}
}
}
spark-shell --packages com.typesafe:config:1.3.3

import com.typesafe.config._
import java.io.File
import scala.collection.JavaConverters._


val path = "/Users/basan/Documents/Projects/metricPush2/application.conf"
val config =ConfigFactory.parseFile(new File(path))

val producerDetails = config.getConfig("app.dataportal.timelinessKafka.producerDetails")
producerDetails.getString("bootstrap.servers")


producerDetails
.entrySet()
.asScala
.map( entry => entry.getKey -> entry.getValue.unwrapped().toString ).toMap

res7: scala.collection.immutable.Map[String,String] = Map(ssl.truststore.location -> Data/certificates/metakafka/client.truststore_stg.jks, security.protocol -> SSL, bootstrap.servers -> broker:9093, client.id -> application name, value.serializer -> org.apache.kafka.common.serialization.StringSerializer, ssl.protocol -> TLSv1.2, ssl.endpoint.identification.algorithm -> "", key.serializer -> org.apache.kafka.common.serialization.StringSerializer, ssl.keystore.location -> Data/certificates/metakafka/server.keystore_stg.jks)


config
.entrySet()
.asScala
.map( entry => entry.getKey -> entry.getValue.toString ).toMap

res5: scala.collection.immutable.Map[String,String] = Map(app.dataportal.timelinessKafka.producerDetails.ssl.keystore.location -> Quoted("Data/certificates/metakafka/server.keystore_stg.jks"), app.contactDetails -> Quoted("DataSciences-DE-GSCL-DigitalFulfillment@Target.com"), app.teamName -> Quoted("GSCL-Digital Fulfillment"), app.dataportal.timelinessKafka.producerDetails.key.serializer -> Quoted("org.apache.kafka.common.serialization.StringSerializer"), app.dataportal.timelinessKafka.producerDetails.value.serializer -> Quoted("org.apache.kafka.common.serialization.StringSerializer"), app.dataportal.timelinessKafka.producerDetails.bootstrap.servers -> Quoted("broker:9093"), app.kelsaApplicationPath -> Quoted("Data/KelsaPath/"), app.dataportal.timelinessKafka.producerDetails.client.id -...