Powered By Blogger

Saturday, April 13, 2024

Spark consumer avro topic

 ./spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1,\

org.apache.spark:spark-avro_2.12:3.5.1,\

io.confluent:kafka-schema-registry-client:7.3.2,\

io.confluent:kafka-avro-serializer:7.3.2,\

io.confluent:kafka-streams-avro-serde:7.3.2 \

--repositories http://packages.confluent.io/maven/



import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.DataFrame

import org.apache.spark.sql.avro.functions._

import scala.collection.JavaConverters._



val schemaRegistryAddr = "https://schemaregistry.dev.basan.com"

spark.conf.set("schema.registry.url", schemaRegistryAddr)

val subjectName = "basan-events-value"

val schemaRegistryOptions = Map(

      "avroSchemaEvolutionMode" -> "none",

      "mode" -> "PERMISSIVE")


val kafkaDF: DataFrame = spark.readStream

        .format("kafka")

        .option("kafka.bootstrap.servers", "kafka-basan.com:9093")

        .option("kafka.security.protocol", "SSL")

        .option("kafka.ssl.truststore.location", "/basan/client.truststore.jks")

        .option("kafka.ssl.keystore.location", "/basan/server.keystore.jks")

        .option("startingOffsets", "earliest")

        .option("subscribe", "basan-events")

        .option("kafka.ssl.keystore.password", "replacewithcorrectpassword")

        .option("kafka.ssl.truststore.password", "replacewithcorrectpassword")

        .option("kafka.ssl.endpoint.identification.algorithm", "")

        .option("key.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer")

        .option("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer")

.load()


//kafkaDF.limit(10).writeStream.format("console").option("truncate", true).start().awaitTermination()


import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient}


import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient

import io.confluent.kafka.schemaregistry.client.SchemaMetadata

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient


val client: SchemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryAddr,  500)


    // Get the latest schema metadata by subject

val schemaMetadata: SchemaMetadata = client.getLatestSchemaMetadata(subjectName)

val schemaId: Int = schemaMetadata.getId()

val schemaString: String = client.getById(schemaId).toString()


//val processedDf = kafkaDF.select(from_avro($"value", schemaString,schemaRegistryOptions.asJava).as("value"))

val processedDf2 = kafkaDF.selectExpr("substring(value, 6) as value").select(from_avro($"value", schemaString,schemaRegistryOptions.asJava).as("value"))

//val processedDf = kafkaDF.select(from_avro($"value", schemaString).as("value"))

//val processedDf = kafkaDF.select(from_avro($"value", responseJson,schemaRegistryOptions.asJava).as("value"))


//val processedDf = kafkaDF.select(from_avro($"value", schemaString,schemaRegistryOptions.asJava).as("value"))

//processedDf.limit(2).writeStream.format("console").option("truncate", false).start().awaitTermination()



processedDf2.limit(500).writeStream.

option("path", "/Users/basan/Documents/dataingestion/avroingestion/data").

option("checkpointLocation", "/Users/basan/Documents/dataingestion/avroingestion/checkpointLocation").

format("json").start().awaitTermination()