./spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1,\
org.apache.spark:spark-avro_2.12:3.5.1,\
io.confluent:kafka-schema-registry-client:7.3.2,\
io.confluent:kafka-avro-serializer:7.3.2,\
io.confluent:kafka-streams-avro-serde:7.3.2 \
--repositories http://packages.confluent.io/maven/
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://schemaregistry.dev.basan.com"
spark.conf.set("schema.registry.url", schemaRegistryAddr)
val subjectName = "basan-events-value"
val schemaRegistryOptions = Map(
"avroSchemaEvolutionMode" -> "none",
"mode" -> "PERMISSIVE")
val kafkaDF: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka-basan.com:9093")
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", "/basan/client.truststore.jks")
.option("kafka.ssl.keystore.location", "/basan/server.keystore.jks")
.option("startingOffsets", "earliest")
.option("subscribe", "basan-events")
.option("kafka.ssl.keystore.password", "replacewithcorrectpassword")
.option("kafka.ssl.truststore.password", "replacewithcorrectpassword")
.option("kafka.ssl.endpoint.identification.algorithm", "")
.option("key.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer")
.option("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer")
.load()
//kafkaDF.limit(10).writeStream.format("console").option("truncate", true).start().awaitTermination()
import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient}
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient
import io.confluent.kafka.schemaregistry.client.SchemaMetadata
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient
val client: SchemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryAddr, 500)
// Get the latest schema metadata by subject
val schemaMetadata: SchemaMetadata = client.getLatestSchemaMetadata(subjectName)
val schemaId: Int = schemaMetadata.getId()
val schemaString: String = client.getById(schemaId).toString()
//val processedDf = kafkaDF.select(from_avro($"value", schemaString,schemaRegistryOptions.asJava).as("value"))
val processedDf2 = kafkaDF.selectExpr("substring(value, 6) as value").select(from_avro($"value", schemaString,schemaRegistryOptions.asJava).as("value"))
//val processedDf = kafkaDF.select(from_avro($"value", schemaString).as("value"))
//val processedDf = kafkaDF.select(from_avro($"value", responseJson,schemaRegistryOptions.asJava).as("value"))
//val processedDf = kafkaDF.select(from_avro($"value", schemaString,schemaRegistryOptions.asJava).as("value"))
//processedDf.limit(2).writeStream.format("console").option("truncate", false).start().awaitTermination()
processedDf2.limit(500).writeStream.
option("path", "/Users/basan/Documents/dataingestion/avroingestion/data").
option("checkpointLocation", "/Users/basan/Documents/dataingestion/avroingestion/checkpointLocation").
format("json").start().awaitTermination()