Powered By Blogger

Friday, May 26, 2023

Kafka sink using spark java - in cdap

 //With the kafka sink cannot set the StringSerializer

DataFrameWriter<Row> dataFrameWriter = keyValueDataSet.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write()
.format("kafka")
.option("topic", config.topicName)
.option("kafka." + "bootstrap.servers", config.broker)
.option("kafka.client.id", "basankafka")
.option("kafka." + "acks", "all");

if (config.sslEnabled) {
LOG.info("Apply SSL configurations");
dataFrameWriter = dataFrameWriter.option("kafka." + "ssl.truststore.location", config.trustStoreLocation)
.option("kafka." + "ssl.keystore.location", config.keyStoreLocation)
.option("kafka." + "ssl.keystore.password", keyStore)
.option("kafka." + "ssl.truststore.password", trustStore)
.option("kafka." + "ssl.protocol", "TLSv1.2")
.option("kafka." + "security.protocol", "SSL")
.option("kafka." + "ssl.endpoint.identification.algorithm", " ");
}
if (producerPropsMap != null && producerPropsMap.size() > 0) {
LOG.info("Apply additional configurations");
dataFrameWriter = dataFrameWriter.options(producerPropsMap);
}
dataFrameWriter.save();


<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-network-common_2.11</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.1</version>
</dependency>


//@TODO delete this method after testing
private void sendMessageDirectly(String topicName, String key, String message, String keyStore, String trustStore) {
Properties properties = new Properties();
properties.put("bootstrap.servers", config.broker);
properties.put("kafka.client.id", "basanid");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("ssl.truststore.location", config.trustStoreLocation);
properties.put("ssl.keystore.location", config.keyStoreLocation);
properties.put("ssl.keystore.password", keyStore);
properties.put("ssl.truststore.password", trustStore);
properties.put("ssl.protocol", "TLSv1.2");
properties.put("security.protocol", "SSL");
properties.put("ssl.endpoint.identification.algorithm", " ");
properties.put("acks", "all");


LOG.info("Sending the message to kafka topic");
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
//Setting context loader to fix issue with partitioner
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
final Producer<String, String> producer = new KafkaProducer<>(properties);
try {
producer.send(new ProducerRecord<>(topicName, key, message), (recordMetadata, exception) -> {

if (exception == null) {
LOG.info("Data Quality Plugin: Successfully sent the details as: \n" +
"Topic:" + recordMetadata.topic() + "\n" +
"Partition:" + recordMetadata.partition() + "\n" +
"Offset" + recordMetadata.offset() + "\n" +
"Timestamp" + recordMetadata.timestamp());
} else {
LOG.error("Sink Plugin: could not send message {}", exception.getMessage());
}
});
} finally {
//Setting back to original context loader, otherwise will get treenode issue
Thread.currentThread().setContextClassLoader(contextClassLoader);
if (Objects.nonNull(producer)) {
producer.flush();
producer.close();
}
}

}

//@TODO delete this method after testing
void writeTokafaByPartition(Dataset<Row> dataset, String keyStore, String trustStore) {
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
//Setting context loader to fix issue with partitioner
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
Properties properties = new Properties();
properties.put("bootstrap.servers", config.broker);
properties.put("kafka.client.id", "basanid");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("ssl.truststore.location", config.trustStoreLocation);
properties.put("ssl.keystore.location", config.keyStoreLocation);
properties.put("ssl.keystore.password", keyStore);
properties.put("ssl.truststore.password", trustStore);
properties.put("ssl.protocol", "TLSv1.2");
properties.put("security.protocol", "SSL");
properties.put("ssl.endpoint.identification.algorithm", " ");
properties.put("acks", "all");
try {
dataset.toJavaRDD().foreachPartition((VoidFunction<Iterator<Row>>) rows -> {
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
while (rows.hasNext()) {
Row row = rows.next();
String key = row.getAs("key");
String value = row.getAs("value");
ProducerRecord<String, String> record = new ProducerRecord<>(config.topicName, key, value);
producer.send(record);
}
producer.close();
});

} catch (Exception ex) {
LOG.error("Error in pushing message to Kafka", ex);
} finally {
//Setting back to original context loader, otherwise will get treenode issue
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}



Wednesday, May 17, 2023

Kafka push from spark dataframe using java api


public SparkSession getSparkSession() {
if(sparkSession == null) {
sparkSession = SparkSession
.builder()
.master("local[2]")
.appName("testspark")
                .getOrCreate();

sparkSession.sparkContext().setLogLevel("error");
}
return sparkSession;
}


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.basan</groupId>
<artifactId>testspark</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<spark.version>2.3.2</spark.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<!-- <scope>provided</scope> -->
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<!-- <scope>provided</scope> -->
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_2.11</artifactId>
<!-- <scope>provided</scope> -->
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.1</version>
</dependency>

<!-- Failed to find data source: kafka.-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.3.1</version>
</dependency>
</dependencies>

</project> 



package com.tgt.de.experiments;

import com.tgt.de.experiments.SparkUtils.SparkInitializer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;

import java.util.ArrayList;
import java.util.List;

public class KafkaPush {


Dataset<Row> createDataFrame(){
Dataset<Row> inputDataSet = null;
SparkSession sparkSession = SparkInitializer.getInstance().getSparkSession();

List<Row> list = new ArrayList<Row>();
list.add(RowFactory.create("one" , "onevalue"));
list.add(RowFactory.create("two", "twovalue"));
list.add(RowFactory.create("three", "threevalue"));
list.add(RowFactory.create("four", "fourvalue"));
List<org.apache.spark.sql.types.StructField> listOfStructField =
new ArrayList<org.apache.spark.sql.types.StructField>();
listOfStructField.add
(DataTypes.createStructField("key", DataTypes.StringType, true));
listOfStructField.add
(DataTypes.createStructField("value", DataTypes.StringType, true));
StructType structType = DataTypes.createStructType(listOfStructField);
Dataset<Row> data = sparkSession.createDataFrame(list, structType);
data.show();
return data;

}

void pushTokafka( Dataset<Row> inputDF){

inputDF.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write()
.format("kafka")
.option("topic","kafka-writer" )
.option("kafka."+ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "basant.com:9093")
.option("kafka.client.id", "kelsav3updated")

.option("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
.option("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
.option("kafka."+SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/basan/truststore.jks")
.option("kafka."+SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/basan/keystore.jks")
.option("kafka."+SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "oooooo")
.option("kafka."+SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "tttttt)
.option("kafka."+SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2")
.option("kafka."+CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL")
.option("kafka."+SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, " ")
.option("kafka."+ProducerConfig.ACKS_CONFIG, "all")
.save();

}


}