Powered By Blogger

Tuesday, July 2, 2019

Kafka Producer in main program


package com.tgt.hdfsToKafka;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

public class TestProducer {

public static void main(String[] args) throws IOException {
try {
Producer producer = createProducer();
String message = readFile();

ProducerRecord record = new ProducerRecord<>("scm-testing",
"testmessagekey", message);

producer.send(record).get();
} catch (InterruptedException | ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

private static Producer createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"broker:9093");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "hdfsToKafka");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// props.put("ssl.truststore.location",
// "/Users/basan/Documents/JavaProjects/hdfsToKafka/src/main/resources/client.truststore.jks");
props.put("ssl.truststore.location", "/Users/basan/Desktop/asfasf/kafka_cert/dddddddclient.truststore.jks");
props.put("ssl.truststore.password", "jansfjjasjfjasjfjj);
// props.put("ssl.keystore.location",
// "/Users/basan/Documents/JavaProjects/hdfsToKafka/src/main/resources/ffods-stg.target.com.jks");
props.put("ssl.keystore.location", "/Users/basan/Desktop/asfasf/kafka_cert/serversfasfasfasfasf.keystore.jks");
props.put("ssl.keystore.password", "nasf as f n n n");
props.put("security.protocol", "SSL");
props.put("ssl.protocol", "TLSv1.2");
return new KafkaProducer<>(props);
}
static String readFile() throws IOException{
String contents = new String(Files.readAllBytes(Paths.get("/Users/basan/Documents/workspace-sts-3.8.4.RELEASE/hdfsToKafka/sample.json")));

return contents;
/*InputStream is = new FileInputStream("/Users/basan/Documents/workspace-sts-3.8.4.RELEASE/hdfsToKafka/sample.json");
BufferedReader buf = new BufferedReader(new InputStreamReader(is));
       
String line = buf.readLine();
StringBuilder sb = new StringBuilder();
       
while(line != null){
  sb.append(line);
  line = buf.readLine();
}
       
String fileAsString = sb.toString();
System.out.println("Contents : " + fileAsString);
return fileAsString;*/
}

}

No comments:

Post a Comment