Powered By Blogger

Wednesday, May 17, 2023

Kafka push from spark dataframe using java api


public SparkSession getSparkSession() {
if(sparkSession == null) {
sparkSession = SparkSession
.builder()
.master("local[2]")
.appName("testspark")
                .getOrCreate();

sparkSession.sparkContext().setLogLevel("error");
}
return sparkSession;
}


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.basan</groupId>
<artifactId>testspark</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<spark.version>2.3.2</spark.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<!-- <scope>provided</scope> -->
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<!-- <scope>provided</scope> -->
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_2.11</artifactId>
<!-- <scope>provided</scope> -->
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.1</version>
</dependency>

<!-- Failed to find data source: kafka.-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.3.1</version>
</dependency>
</dependencies>

</project> 



package com.tgt.de.experiments;

import com.tgt.de.experiments.SparkUtils.SparkInitializer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;

import java.util.ArrayList;
import java.util.List;

public class KafkaPush {


Dataset<Row> createDataFrame(){
Dataset<Row> inputDataSet = null;
SparkSession sparkSession = SparkInitializer.getInstance().getSparkSession();

List<Row> list = new ArrayList<Row>();
list.add(RowFactory.create("one" , "onevalue"));
list.add(RowFactory.create("two", "twovalue"));
list.add(RowFactory.create("three", "threevalue"));
list.add(RowFactory.create("four", "fourvalue"));
List<org.apache.spark.sql.types.StructField> listOfStructField =
new ArrayList<org.apache.spark.sql.types.StructField>();
listOfStructField.add
(DataTypes.createStructField("key", DataTypes.StringType, true));
listOfStructField.add
(DataTypes.createStructField("value", DataTypes.StringType, true));
StructType structType = DataTypes.createStructType(listOfStructField);
Dataset<Row> data = sparkSession.createDataFrame(list, structType);
data.show();
return data;

}

void pushTokafka( Dataset<Row> inputDF){

inputDF.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write()
.format("kafka")
.option("topic","kafka-writer" )
.option("kafka."+ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "basant.com:9093")
.option("kafka.client.id", "kelsav3updated")

.option("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
.option("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
.option("kafka."+SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/basan/truststore.jks")
.option("kafka."+SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/basan/keystore.jks")
.option("kafka."+SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "oooooo")
.option("kafka."+SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "tttttt)
.option("kafka."+SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2")
.option("kafka."+CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL")
.option("kafka."+SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, " ")
.option("kafka."+ProducerConfig.ACKS_CONFIG, "all")
.save();

}


}

No comments:

Post a Comment