Powered By Blogger

Friday, May 26, 2023

Kafka sink using spark java - in cdap

 //With the kafka sink cannot set the StringSerializer

DataFrameWriter<Row> dataFrameWriter = keyValueDataSet.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write()
.format("kafka")
.option("topic", config.topicName)
.option("kafka." + "bootstrap.servers", config.broker)
.option("kafka.client.id", "basankafka")
.option("kafka." + "acks", "all");

if (config.sslEnabled) {
LOG.info("Apply SSL configurations");
dataFrameWriter = dataFrameWriter.option("kafka." + "ssl.truststore.location", config.trustStoreLocation)
.option("kafka." + "ssl.keystore.location", config.keyStoreLocation)
.option("kafka." + "ssl.keystore.password", keyStore)
.option("kafka." + "ssl.truststore.password", trustStore)
.option("kafka." + "ssl.protocol", "TLSv1.2")
.option("kafka." + "security.protocol", "SSL")
.option("kafka." + "ssl.endpoint.identification.algorithm", " ");
}
if (producerPropsMap != null && producerPropsMap.size() > 0) {
LOG.info("Apply additional configurations");
dataFrameWriter = dataFrameWriter.options(producerPropsMap);
}
dataFrameWriter.save();


<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-network-common_2.11</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.1</version>
</dependency>


//@TODO delete this method after testing
private void sendMessageDirectly(String topicName, String key, String message, String keyStore, String trustStore) {
Properties properties = new Properties();
properties.put("bootstrap.servers", config.broker);
properties.put("kafka.client.id", "basanid");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("ssl.truststore.location", config.trustStoreLocation);
properties.put("ssl.keystore.location", config.keyStoreLocation);
properties.put("ssl.keystore.password", keyStore);
properties.put("ssl.truststore.password", trustStore);
properties.put("ssl.protocol", "TLSv1.2");
properties.put("security.protocol", "SSL");
properties.put("ssl.endpoint.identification.algorithm", " ");
properties.put("acks", "all");


LOG.info("Sending the message to kafka topic");
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
//Setting context loader to fix issue with partitioner
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
final Producer<String, String> producer = new KafkaProducer<>(properties);
try {
producer.send(new ProducerRecord<>(topicName, key, message), (recordMetadata, exception) -> {

if (exception == null) {
LOG.info("Data Quality Plugin: Successfully sent the details as: \n" +
"Topic:" + recordMetadata.topic() + "\n" +
"Partition:" + recordMetadata.partition() + "\n" +
"Offset" + recordMetadata.offset() + "\n" +
"Timestamp" + recordMetadata.timestamp());
} else {
LOG.error("Sink Plugin: could not send message {}", exception.getMessage());
}
});
} finally {
//Setting back to original context loader, otherwise will get treenode issue
Thread.currentThread().setContextClassLoader(contextClassLoader);
if (Objects.nonNull(producer)) {
producer.flush();
producer.close();
}
}

}

//@TODO delete this method after testing
void writeTokafaByPartition(Dataset<Row> dataset, String keyStore, String trustStore) {
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
//Setting context loader to fix issue with partitioner
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
Properties properties = new Properties();
properties.put("bootstrap.servers", config.broker);
properties.put("kafka.client.id", "basanid");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("ssl.truststore.location", config.trustStoreLocation);
properties.put("ssl.keystore.location", config.keyStoreLocation);
properties.put("ssl.keystore.password", keyStore);
properties.put("ssl.truststore.password", trustStore);
properties.put("ssl.protocol", "TLSv1.2");
properties.put("security.protocol", "SSL");
properties.put("ssl.endpoint.identification.algorithm", " ");
properties.put("acks", "all");
try {
dataset.toJavaRDD().foreachPartition((VoidFunction<Iterator<Row>>) rows -> {
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
while (rows.hasNext()) {
Row row = rows.next();
String key = row.getAs("key");
String value = row.getAs("value");
ProducerRecord<String, String> record = new ProducerRecord<>(config.topicName, key, value);
producer.send(record);
}
producer.close();
});

} catch (Exception ex) {
LOG.error("Error in pushing message to Kafka", ex);
} finally {
//Setting back to original context loader, otherwise will get treenode issue
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}



No comments:

Post a Comment