Powered By Blogger

Saturday, April 13, 2024

Spark consumer avro topic

 ./spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1,\

org.apache.spark:spark-avro_2.12:3.5.1,\

io.confluent:kafka-schema-registry-client:7.3.2,\

io.confluent:kafka-avro-serializer:7.3.2,\

io.confluent:kafka-streams-avro-serde:7.3.2 \

--repositories http://packages.confluent.io/maven/



import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.DataFrame

import org.apache.spark.sql.avro.functions._

import scala.collection.JavaConverters._



val schemaRegistryAddr = "https://schemaregistry.dev.basan.com"

spark.conf.set("schema.registry.url", schemaRegistryAddr)

val subjectName = "basan-events-value"

val schemaRegistryOptions = Map(

      "avroSchemaEvolutionMode" -> "none",

      "mode" -> "PERMISSIVE")


val kafkaDF: DataFrame = spark.readStream

        .format("kafka")

        .option("kafka.bootstrap.servers", "kafka-basan.com:9093")

        .option("kafka.security.protocol", "SSL")

        .option("kafka.ssl.truststore.location", "/basan/client.truststore.jks")

        .option("kafka.ssl.keystore.location", "/basan/server.keystore.jks")

        .option("startingOffsets", "earliest")

        .option("subscribe", "basan-events")

        .option("kafka.ssl.keystore.password", "replacewithcorrectpassword")

        .option("kafka.ssl.truststore.password", "replacewithcorrectpassword")

        .option("kafka.ssl.endpoint.identification.algorithm", "")

        .option("key.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer")

        .option("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer")

.load()


//kafkaDF.limit(10).writeStream.format("console").option("truncate", true).start().awaitTermination()


import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient}


import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient

import io.confluent.kafka.schemaregistry.client.SchemaMetadata

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient


val client: SchemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryAddr,  500)


    // Get the latest schema metadata by subject

val schemaMetadata: SchemaMetadata = client.getLatestSchemaMetadata(subjectName)

val schemaId: Int = schemaMetadata.getId()

val schemaString: String = client.getById(schemaId).toString()


//val processedDf = kafkaDF.select(from_avro($"value", schemaString,schemaRegistryOptions.asJava).as("value"))

val processedDf2 = kafkaDF.selectExpr("substring(value, 6) as value").select(from_avro($"value", schemaString,schemaRegistryOptions.asJava).as("value"))

//val processedDf = kafkaDF.select(from_avro($"value", schemaString).as("value"))

//val processedDf = kafkaDF.select(from_avro($"value", responseJson,schemaRegistryOptions.asJava).as("value"))


//val processedDf = kafkaDF.select(from_avro($"value", schemaString,schemaRegistryOptions.asJava).as("value"))

//processedDf.limit(2).writeStream.format("console").option("truncate", false).start().awaitTermination()



processedDf2.limit(500).writeStream.

option("path", "/Users/basan/Documents/dataingestion/avroingestion/data").

option("checkpointLocation", "/Users/basan/Documents/dataingestion/avroingestion/checkpointLocation").

format("json").start().awaitTermination()


Friday, May 26, 2023

Kafka sink using spark java - in cdap

 //With the kafka sink cannot set the StringSerializer

DataFrameWriter<Row> dataFrameWriter = keyValueDataSet.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write()
.format("kafka")
.option("topic", config.topicName)
.option("kafka." + "bootstrap.servers", config.broker)
.option("kafka.client.id", "basankafka")
.option("kafka." + "acks", "all");

if (config.sslEnabled) {
LOG.info("Apply SSL configurations");
dataFrameWriter = dataFrameWriter.option("kafka." + "ssl.truststore.location", config.trustStoreLocation)
.option("kafka." + "ssl.keystore.location", config.keyStoreLocation)
.option("kafka." + "ssl.keystore.password", keyStore)
.option("kafka." + "ssl.truststore.password", trustStore)
.option("kafka." + "ssl.protocol", "TLSv1.2")
.option("kafka." + "security.protocol", "SSL")
.option("kafka." + "ssl.endpoint.identification.algorithm", " ");
}
if (producerPropsMap != null && producerPropsMap.size() > 0) {
LOG.info("Apply additional configurations");
dataFrameWriter = dataFrameWriter.options(producerPropsMap);
}
dataFrameWriter.save();


<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-network-common_2.11</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.1</version>
</dependency>


//@TODO delete this method after testing
private void sendMessageDirectly(String topicName, String key, String message, String keyStore, String trustStore) {
Properties properties = new Properties();
properties.put("bootstrap.servers", config.broker);
properties.put("kafka.client.id", "basanid");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("ssl.truststore.location", config.trustStoreLocation);
properties.put("ssl.keystore.location", config.keyStoreLocation);
properties.put("ssl.keystore.password", keyStore);
properties.put("ssl.truststore.password", trustStore);
properties.put("ssl.protocol", "TLSv1.2");
properties.put("security.protocol", "SSL");
properties.put("ssl.endpoint.identification.algorithm", " ");
properties.put("acks", "all");


LOG.info("Sending the message to kafka topic");
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
//Setting context loader to fix issue with partitioner
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
final Producer<String, String> producer = new KafkaProducer<>(properties);
try {
producer.send(new ProducerRecord<>(topicName, key, message), (recordMetadata, exception) -> {

if (exception == null) {
LOG.info("Data Quality Plugin: Successfully sent the details as: \n" +
"Topic:" + recordMetadata.topic() + "\n" +
"Partition:" + recordMetadata.partition() + "\n" +
"Offset" + recordMetadata.offset() + "\n" +
"Timestamp" + recordMetadata.timestamp());
} else {
LOG.error("Sink Plugin: could not send message {}", exception.getMessage());
}
});
} finally {
//Setting back to original context loader, otherwise will get treenode issue
Thread.currentThread().setContextClassLoader(contextClassLoader);
if (Objects.nonNull(producer)) {
producer.flush();
producer.close();
}
}

}

//@TODO delete this method after testing
void writeTokafaByPartition(Dataset<Row> dataset, String keyStore, String trustStore) {
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
//Setting context loader to fix issue with partitioner
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
Properties properties = new Properties();
properties.put("bootstrap.servers", config.broker);
properties.put("kafka.client.id", "basanid");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("ssl.truststore.location", config.trustStoreLocation);
properties.put("ssl.keystore.location", config.keyStoreLocation);
properties.put("ssl.keystore.password", keyStore);
properties.put("ssl.truststore.password", trustStore);
properties.put("ssl.protocol", "TLSv1.2");
properties.put("security.protocol", "SSL");
properties.put("ssl.endpoint.identification.algorithm", " ");
properties.put("acks", "all");
try {
dataset.toJavaRDD().foreachPartition((VoidFunction<Iterator<Row>>) rows -> {
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
while (rows.hasNext()) {
Row row = rows.next();
String key = row.getAs("key");
String value = row.getAs("value");
ProducerRecord<String, String> record = new ProducerRecord<>(config.topicName, key, value);
producer.send(record);
}
producer.close();
});

} catch (Exception ex) {
LOG.error("Error in pushing message to Kafka", ex);
} finally {
//Setting back to original context loader, otherwise will get treenode issue
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}



Wednesday, May 17, 2023

Kafka push from spark dataframe using java api


public SparkSession getSparkSession() {
if(sparkSession == null) {
sparkSession = SparkSession
.builder()
.master("local[2]")
.appName("testspark")
                .getOrCreate();

sparkSession.sparkContext().setLogLevel("error");
}
return sparkSession;
}


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.basan</groupId>
<artifactId>testspark</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<spark.version>2.3.2</spark.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<!-- <scope>provided</scope> -->
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<!-- <scope>provided</scope> -->
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_2.11</artifactId>
<!-- <scope>provided</scope> -->
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.1</version>
</dependency>

<!-- Failed to find data source: kafka.-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.3.1</version>
</dependency>
</dependencies>

</project> 



package com.tgt.de.experiments;

import com.tgt.de.experiments.SparkUtils.SparkInitializer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;

import java.util.ArrayList;
import java.util.List;

public class KafkaPush {


Dataset<Row> createDataFrame(){
Dataset<Row> inputDataSet = null;
SparkSession sparkSession = SparkInitializer.getInstance().getSparkSession();

List<Row> list = new ArrayList<Row>();
list.add(RowFactory.create("one" , "onevalue"));
list.add(RowFactory.create("two", "twovalue"));
list.add(RowFactory.create("three", "threevalue"));
list.add(RowFactory.create("four", "fourvalue"));
List<org.apache.spark.sql.types.StructField> listOfStructField =
new ArrayList<org.apache.spark.sql.types.StructField>();
listOfStructField.add
(DataTypes.createStructField("key", DataTypes.StringType, true));
listOfStructField.add
(DataTypes.createStructField("value", DataTypes.StringType, true));
StructType structType = DataTypes.createStructType(listOfStructField);
Dataset<Row> data = sparkSession.createDataFrame(list, structType);
data.show();
return data;

}

void pushTokafka( Dataset<Row> inputDF){

inputDF.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write()
.format("kafka")
.option("topic","kafka-writer" )
.option("kafka."+ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "basant.com:9093")
.option("kafka.client.id", "kelsav3updated")

.option("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
.option("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
.option("kafka."+SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/basan/truststore.jks")
.option("kafka."+SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/basan/keystore.jks")
.option("kafka."+SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "oooooo")
.option("kafka."+SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "tttttt)
.option("kafka."+SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2")
.option("kafka."+CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL")
.option("kafka."+SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, " ")
.option("kafka."+ProducerConfig.ACKS_CONFIG, "all")
.save();

}


}

Tuesday, December 13, 2022

Array of structs

 scala> import spark.implicits._

import spark.implicits._


scala> import org.apache.spark.sql.types._

import org.apache.spark.sql.types._


scala> import org.apache.spark.sql._

import org.apache.spark.sql._


scala>  val arrayStructData = Seq(

     |       Row("James",List(Row("Java","XX",120),Row("Scala","XA",300))),

     |       Row("Michael",List(Row("Java","XY",200),Row("Scala","XB",500))),

     |       Row("Robert",List(Row("Java","XZ",400),Row("Scala","XC",250))),

     |       Row("Washington",null)

     |     )

arrayStructData: Seq[org.apache.spark.sql.Row] = List([James,List([Java,XX,120], [Scala,XA,300])], [Michael,List([Java,XY,200], [Scala,XB,500])], [Robert,List([Java,XZ,400], [Scala,XC,250])], [Washington,null])



    val arrayStructSchema = new StructType().add("name",StringType)

      .add("booksIntersted",ArrayType(new StructType()

        .add("name",StringType)

        .add("author",StringType)

        .add("pages",IntegerType)))



arrayStructSchema: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(booksIntersted,ArrayType(StructType(StructField(name,StringType,true), StructField(author,StringType,true), StructField(pages,IntegerType,true)),true),true))


scala>     val df = spark.createDataFrame(spark.sparkContext

     |         .parallelize(arrayStructData),arrayStructSchema)

df: org.apache.spark.sql.DataFrame = [name: string, booksIntersted: array<struct<name:string,author:string,pages:int>>]


scala>     df.printSchema()

root

 |-- name: string (nullable = true)

 |-- booksIntersted: array (nullable = true)

 |    |-- element: struct (containsNull = true)

 |    |    |-- name: string (nullable = true)

 |    |    |-- author: string (nullable = true)

 |    |    |-- pages: integer (nullable = true)



scala>     df.show(false)

+----------+-----------------------------------+

|name      |booksIntersted                     |

+----------+-----------------------------------+

|James     |[[Java, XX, 120], [Scala, XA, 300]]|

|Michael   |[[Java, XY, 200], [Scala, XB, 500]]|

|Robert    |[[Java, XZ, 400], [Scala, XC, 250]]|

|Washington|null                               |

+----------+-----------------------------------+


Thursday, December 8, 2022

UnionByName with the empty dataframe

 import spark.implicits._

import org.apache.spark.sql.types.
import org.apache.spark.sql._

val data = Seq(("James","Sales",34), ("Michael","Sales",56),
("Robert","Sales",30), ("Maria","Finance",24) )
val df1 = data.toDF("name","dept","age")
df1.printSchema()



val schema = StructType(
StructField("name", StringType, true) ::
StructField("dept", StringType, false) ::
StructField("age", IntegerType, false) :: Nil)


val df = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema)


val merged_df = df1.unionByName(df)
merged_df.show(false)

val merged_df2 = df.unionByName(df1)
merged_df2.show(false)

Tuesday, November 29, 2022

Writing empty dataframe spark

 import spark.implicits._


val data=Seq()

val columns = Seq("firstname","lastname","country","state")

import spark.implicits._

val df = data.toDF(columns:_*)



println("emptydataframe write succeed")

val df = spark.emptyDataFrame

val path = "/Users/basan/Documents/sparktest/emptyFolder2"

df.write.format("orc").save(path)


Thursday, November 10, 2022

CDAP connect to external DB

 Add below properties in cdap-site.xml


<property>

    <name>data.storage.implementation</name>

    <value>postgresql</value>

    <description>

      PG as metadata store

    </description>

  </property>


  <property>

    <name>data.storage.sql.jdbc.connection.url</name>

    <value>jdbc:postgresql://localhost:5432/basanversion3</value>

    <description>

      PG JDBC details

    </description>

  </property>


  <property>

    <name>data.storage.sql.jdbc.driver.name</name>

    <value>org.postgresql.Driver</value>

    <description>

      PG jdbc driver

    </description>

  </property>


  <property>

    <name>data.storage.sql.jdbc.driver.external</name>

    <value>true</value>

    <description>

      Indicates whether the JDBC driver has to be loaded from an external directory.

      If true, then the JDBC driver directory has to be specified using

      "data.storage.sql.jdbc.driver.directory".

      If false, then the JDBC driver is present in the CDAP classpath.

      This config can only be used when the storage implementation is postgresql.

    </description>

  </property>



  <property>

    <name>data.storage.sql.jdbc.driver.directory</name>

    <value>/Users/basan/Documents/CDAP/git/postgres</value>

    <description>

      The base directory for storing JDBC driver jars.

      Sub-directory with the name that matches with the value of "data.storage.implementation" setting

      will be searched for the corresponding JDBC driver and

      dependencies jars to connect to the configured sql instance.

      The JDBC driver class to load has to be specified using "data.storage.sql.jdbc.driver.name".

      This config can only be used when the storage implementation is postgresql.

    </description>

  </property>



Create the db in postgres with the name basanversion3 

Place the jar postgresql-42.2.24.jar  in the location /Users/basan/Documents/CDAP/git/postgres