//With the kafka sink cannot set the StringSerializer
DataFrameWriter<Row> dataFrameWriter = keyValueDataSet.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write()
.format("kafka")
.option("topic", config.topicName)
.option("kafka." + "bootstrap.servers", config.broker)
.option("kafka.client.id", "basankafka")
.option("kafka." + "acks", "all");
if (config.sslEnabled) {
LOG.info("Apply SSL configurations");
dataFrameWriter = dataFrameWriter.option("kafka." + "ssl.truststore.location", config.trustStoreLocation)
.option("kafka." + "ssl.keystore.location", config.keyStoreLocation)
.option("kafka." + "ssl.keystore.password", keyStore)
.option("kafka." + "ssl.truststore.password", trustStore)
.option("kafka." + "ssl.protocol", "TLSv1.2")
.option("kafka." + "security.protocol", "SSL")
.option("kafka." + "ssl.endpoint.identification.algorithm", " ");
}
if (producerPropsMap != null && producerPropsMap.size() > 0) {
LOG.info("Apply additional configurations");
dataFrameWriter = dataFrameWriter.options(producerPropsMap);
}
dataFrameWriter.save();
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-network-common_2.11</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.1</version>
</dependency>
//@TODO delete this method after testingprivate void sendMessageDirectly(String topicName, String key, String message, String keyStore, String trustStore) {Properties properties = new Properties();properties.put("bootstrap.servers", config.broker);properties.put("kafka.client.id", "basanid");properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("ssl.truststore.location", config.trustStoreLocation);properties.put("ssl.keystore.location", config.keyStoreLocation);properties.put("ssl.keystore.password", keyStore);properties.put("ssl.truststore.password", trustStore);properties.put("ssl.protocol", "TLSv1.2");properties.put("security.protocol", "SSL");properties.put("ssl.endpoint.identification.algorithm", " ");properties.put("acks", "all");LOG.info("Sending the message to kafka topic");ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();//Setting context loader to fix issue with partitionerThread.currentThread().setContextClassLoader(getClass().getClassLoader());final Producer<String, String> producer = new KafkaProducer<>(properties);try {producer.send(new ProducerRecord<>(topicName, key, message), (recordMetadata, exception) -> {if (exception == null) {LOG.info("Data Quality Plugin: Successfully sent the details as: \n" +"Topic:" + recordMetadata.topic() + "\n" +"Partition:" + recordMetadata.partition() + "\n" +"Offset" + recordMetadata.offset() + "\n" +"Timestamp" + recordMetadata.timestamp());} else {LOG.error("Sink Plugin: could not send message {}", exception.getMessage());}});} finally {//Setting back to original context loader, otherwise will get treenode issueThread.currentThread().setContextClassLoader(contextClassLoader);if (Objects.nonNull(producer)) {producer.flush();producer.close();}}}//@TODO delete this method after testingvoid writeTokafaByPartition(Dataset<Row> dataset, String keyStore, String trustStore) {ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();//Setting context loader to fix issue with partitionerThread.currentThread().setContextClassLoader(getClass().getClassLoader());Properties properties = new Properties();properties.put("bootstrap.servers", config.broker);properties.put("kafka.client.id", "basanid");properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("ssl.truststore.location", config.trustStoreLocation);properties.put("ssl.keystore.location", config.keyStoreLocation);properties.put("ssl.keystore.password", keyStore);properties.put("ssl.truststore.password", trustStore);properties.put("ssl.protocol", "TLSv1.2");properties.put("security.protocol", "SSL");properties.put("ssl.endpoint.identification.algorithm", " ");properties.put("acks", "all");try {dataset.toJavaRDD().foreachPartition((VoidFunction<Iterator<Row>>) rows -> {KafkaProducer<String, String> producer = new KafkaProducer<>(properties);while (rows.hasNext()) {Row row = rows.next();String key = row.getAs("key");String value = row.getAs("value");ProducerRecord<String, String> record = new ProducerRecord<>(config.topicName, key, value);producer.send(record);}producer.close();});} catch (Exception ex) {LOG.error("Error in pushing message to Kafka", ex);} finally {//Setting back to original context loader, otherwise will get treenode issueThread.currentThread().setContextClassLoader(contextClassLoader);}}