Powered By Blogger

Friday, May 10, 2024

Download file from sharepoint site



package com.tgt.ding.sp.reader;

import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.json.JSONObject;

public class SharePointDownload {

private static final String SHAREPOINT_SITE = "https://basanonline.sharepoint.com/sites/ogrp-ds-basan";
private static final String DOCUMENT_LIBRARY = "Documents";
private static final String FILE_NAME = "Book.xlsx";
private static final String CLIENT_ID = "your-client-id";
private static final String CLIENT_SECRET = "your-client-secret";
private static final String ACCESS_TOKEN = getAccessToken(CLIENT_ID, CLIENT_SECRET); // Implement access token retrieval logic

public static void main(String[] args) throws URISyntaxException, IOException {
String downloadUrl = buildDownloadUrl(SHAREPOINT_SITE, DOCUMENT_LIBRARY, FILE_NAME);
downloadFile(downloadUrl, ACCESS_TOKEN);
}

private static String buildDownloadUrl(String siteUrl, String documentLibrary, String fileName) {
return siteUrl + "/_api/web/GetFileByServerRelativePath(decodedurl='/sites/ogrp-ds-basan/Shared%20Documents/DataFolder/testCrawl/Book.xlsx')/$value";
}

private static void downloadFile(String downloadUrl, String accessToken) throws URISyntaxException, IOException {
CloseableHttpClient httpClient = HttpClients.createDefault();
HttpGet request = new HttpGet(new URI(downloadUrl));
request.addHeader("Authorization", "Bearer " + accessToken);

try (CloseableHttpResponse response = httpClient.execute(request)) {
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode == 200) {
InputStream inputStream = response.getEntity().getContent();
FileOutputStream outputStream = new FileOutputStream("Book.xlsx");
byte[] buffer = new byte[1024];
int bytesRead;
while ((bytesRead = inputStream.read(buffer)) != -1) {
outputStream.write(buffer, 0, bytesRead);
}
outputStream.close();
System.out.println("File downloaded successfully!");
} else {
System.out.println("Error downloading file: " + statusCode);
}
}
}

// Replace this with your access token retrieval logic using Azure AD
private static String getAccessToken(String clientId, String clientSecret) {
// Implement logic to get access token using Azure AD and OAuth 2.0 flow
return "token";
}
}



Get tenantid and resourceid

curl --request GET \

  --url https://basanonline.sharepoint.com/sites/ogrp-ds-basan/_vti_bin/client.svc \

  --header 'Authorization: Bearer bearer' \

  --header 'User-Agent: insomnia/8.6.1'

Look into www-authenticate header in response

Bearer r ealm="tenantid",client_id="clientidresource",trusted_issuers="tamper@*,D3776938-3DBA-481F-A652-4BEDFCAB7CD8@*,https://sts.windows.net/*/,https://login.microsoftonline.com/*/v2.0,tamperce00-000000000000@tenantid",authorization_uri="https://login.microsoftonline.com/common/oauth2/authorize"



Getting token using clientid, clientsecret and tenantid and clientidresource


curl --request POST \

  --url https://accounts.accesscontrol.windows.net/tenantid/tokens/OAuth/2 \

  --header 'Content-Type: application/x-www-form-urlencoded' \

  --data grant_type=client_credentials \

  --data client_id=clientid@tenantid\

  --data 'client_secret=ClientSECRET' \

  --data resource= clientidresource/basanonline.sharepoint.com@ tenantid


 https://sharepointcass.com/2021/04/01/sharepoint-online-rest-apis-part-iii-pages/

Saturday, April 13, 2024

Spark consumer avro topic

 ./spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1,\

org.apache.spark:spark-avro_2.12:3.5.1,\

io.confluent:kafka-schema-registry-client:7.3.2,\

io.confluent:kafka-avro-serializer:7.3.2,\

io.confluent:kafka-streams-avro-serde:7.3.2 \

--repositories http://packages.confluent.io/maven/



import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.DataFrame

import org.apache.spark.sql.avro.functions._

import scala.collection.JavaConverters._



val schemaRegistryAddr = "https://schemaregistry.dev.basan.com"

spark.conf.set("schema.registry.url", schemaRegistryAddr)

val subjectName = "basan-events-value"

val schemaRegistryOptions = Map(

      "avroSchemaEvolutionMode" -> "none",

      "mode" -> "PERMISSIVE")


val kafkaDF: DataFrame = spark.readStream

        .format("kafka")

        .option("kafka.bootstrap.servers", "kafka-basan.com:9093")

        .option("kafka.security.protocol", "SSL")

        .option("kafka.ssl.truststore.location", "/basan/client.truststore.jks")

        .option("kafka.ssl.keystore.location", "/basan/server.keystore.jks")

        .option("startingOffsets", "earliest")

        .option("subscribe", "basan-events")

        .option("kafka.ssl.keystore.password", "replacewithcorrectpassword")

        .option("kafka.ssl.truststore.password", "replacewithcorrectpassword")

        .option("kafka.ssl.endpoint.identification.algorithm", "")

        .option("key.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer")

        .option("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer")

.load()


//kafkaDF.limit(10).writeStream.format("console").option("truncate", true).start().awaitTermination()


import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient}


import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient

import io.confluent.kafka.schemaregistry.client.SchemaMetadata

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient


val client: SchemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryAddr,  500)


    // Get the latest schema metadata by subject

val schemaMetadata: SchemaMetadata = client.getLatestSchemaMetadata(subjectName)

val schemaId: Int = schemaMetadata.getId()

val schemaString: String = client.getById(schemaId).toString()


//val processedDf = kafkaDF.select(from_avro($"value", schemaString,schemaRegistryOptions.asJava).as("value"))

val processedDf2 = kafkaDF.selectExpr("substring(value, 6) as value").select(from_avro($"value", schemaString,schemaRegistryOptions.asJava).as("value"))

//val processedDf = kafkaDF.select(from_avro($"value", schemaString).as("value"))

//val processedDf = kafkaDF.select(from_avro($"value", responseJson,schemaRegistryOptions.asJava).as("value"))


//val processedDf = kafkaDF.select(from_avro($"value", schemaString,schemaRegistryOptions.asJava).as("value"))

//processedDf.limit(2).writeStream.format("console").option("truncate", false).start().awaitTermination()



processedDf2.limit(500).writeStream.

option("path", "/Users/basan/Documents/dataingestion/avroingestion/data").

option("checkpointLocation", "/Users/basan/Documents/dataingestion/avroingestion/checkpointLocation").

format("json").start().awaitTermination()


Friday, May 26, 2023

Kafka sink using spark java - in cdap

 //With the kafka sink cannot set the StringSerializer

DataFrameWriter<Row> dataFrameWriter = keyValueDataSet.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write()
.format("kafka")
.option("topic", config.topicName)
.option("kafka." + "bootstrap.servers", config.broker)
.option("kafka.client.id", "basankafka")
.option("kafka." + "acks", "all");

if (config.sslEnabled) {
LOG.info("Apply SSL configurations");
dataFrameWriter = dataFrameWriter.option("kafka." + "ssl.truststore.location", config.trustStoreLocation)
.option("kafka." + "ssl.keystore.location", config.keyStoreLocation)
.option("kafka." + "ssl.keystore.password", keyStore)
.option("kafka." + "ssl.truststore.password", trustStore)
.option("kafka." + "ssl.protocol", "TLSv1.2")
.option("kafka." + "security.protocol", "SSL")
.option("kafka." + "ssl.endpoint.identification.algorithm", " ");
}
if (producerPropsMap != null && producerPropsMap.size() > 0) {
LOG.info("Apply additional configurations");
dataFrameWriter = dataFrameWriter.options(producerPropsMap);
}
dataFrameWriter.save();


<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-network-common_2.11</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.1</version>
</dependency>


//@TODO delete this method after testing
private void sendMessageDirectly(String topicName, String key, String message, String keyStore, String trustStore) {
Properties properties = new Properties();
properties.put("bootstrap.servers", config.broker);
properties.put("kafka.client.id", "basanid");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("ssl.truststore.location", config.trustStoreLocation);
properties.put("ssl.keystore.location", config.keyStoreLocation);
properties.put("ssl.keystore.password", keyStore);
properties.put("ssl.truststore.password", trustStore);
properties.put("ssl.protocol", "TLSv1.2");
properties.put("security.protocol", "SSL");
properties.put("ssl.endpoint.identification.algorithm", " ");
properties.put("acks", "all");


LOG.info("Sending the message to kafka topic");
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
//Setting context loader to fix issue with partitioner
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
final Producer<String, String> producer = new KafkaProducer<>(properties);
try {
producer.send(new ProducerRecord<>(topicName, key, message), (recordMetadata, exception) -> {

if (exception == null) {
LOG.info("Data Quality Plugin: Successfully sent the details as: \n" +
"Topic:" + recordMetadata.topic() + "\n" +
"Partition:" + recordMetadata.partition() + "\n" +
"Offset" + recordMetadata.offset() + "\n" +
"Timestamp" + recordMetadata.timestamp());
} else {
LOG.error("Sink Plugin: could not send message {}", exception.getMessage());
}
});
} finally {
//Setting back to original context loader, otherwise will get treenode issue
Thread.currentThread().setContextClassLoader(contextClassLoader);
if (Objects.nonNull(producer)) {
producer.flush();
producer.close();
}
}

}

//@TODO delete this method after testing
void writeTokafaByPartition(Dataset<Row> dataset, String keyStore, String trustStore) {
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
//Setting context loader to fix issue with partitioner
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
Properties properties = new Properties();
properties.put("bootstrap.servers", config.broker);
properties.put("kafka.client.id", "basanid");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("ssl.truststore.location", config.trustStoreLocation);
properties.put("ssl.keystore.location", config.keyStoreLocation);
properties.put("ssl.keystore.password", keyStore);
properties.put("ssl.truststore.password", trustStore);
properties.put("ssl.protocol", "TLSv1.2");
properties.put("security.protocol", "SSL");
properties.put("ssl.endpoint.identification.algorithm", " ");
properties.put("acks", "all");
try {
dataset.toJavaRDD().foreachPartition((VoidFunction<Iterator<Row>>) rows -> {
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
while (rows.hasNext()) {
Row row = rows.next();
String key = row.getAs("key");
String value = row.getAs("value");
ProducerRecord<String, String> record = new ProducerRecord<>(config.topicName, key, value);
producer.send(record);
}
producer.close();
});

} catch (Exception ex) {
LOG.error("Error in pushing message to Kafka", ex);
} finally {
//Setting back to original context loader, otherwise will get treenode issue
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}



Wednesday, May 17, 2023

Kafka push from spark dataframe using java api


public SparkSession getSparkSession() {
if(sparkSession == null) {
sparkSession = SparkSession
.builder()
.master("local[2]")
.appName("testspark")
                .getOrCreate();

sparkSession.sparkContext().setLogLevel("error");
}
return sparkSession;
}


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.basan</groupId>
<artifactId>testspark</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<spark.version>2.3.2</spark.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<!-- <scope>provided</scope> -->
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<!-- <scope>provided</scope> -->
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_2.11</artifactId>
<!-- <scope>provided</scope> -->
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.1</version>
</dependency>

<!-- Failed to find data source: kafka.-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.3.1</version>
</dependency>
</dependencies>

</project> 



package com.tgt.de.experiments;

import com.tgt.de.experiments.SparkUtils.SparkInitializer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;

import java.util.ArrayList;
import java.util.List;

public class KafkaPush {


Dataset<Row> createDataFrame(){
Dataset<Row> inputDataSet = null;
SparkSession sparkSession = SparkInitializer.getInstance().getSparkSession();

List<Row> list = new ArrayList<Row>();
list.add(RowFactory.create("one" , "onevalue"));
list.add(RowFactory.create("two", "twovalue"));
list.add(RowFactory.create("three", "threevalue"));
list.add(RowFactory.create("four", "fourvalue"));
List<org.apache.spark.sql.types.StructField> listOfStructField =
new ArrayList<org.apache.spark.sql.types.StructField>();
listOfStructField.add
(DataTypes.createStructField("key", DataTypes.StringType, true));
listOfStructField.add
(DataTypes.createStructField("value", DataTypes.StringType, true));
StructType structType = DataTypes.createStructType(listOfStructField);
Dataset<Row> data = sparkSession.createDataFrame(list, structType);
data.show();
return data;

}

void pushTokafka( Dataset<Row> inputDF){

inputDF.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write()
.format("kafka")
.option("topic","kafka-writer" )
.option("kafka."+ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "basant.com:9093")
.option("kafka.client.id", "kelsav3updated")

.option("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
.option("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
.option("kafka."+SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/basan/truststore.jks")
.option("kafka."+SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/basan/keystore.jks")
.option("kafka."+SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "oooooo")
.option("kafka."+SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "tttttt)
.option("kafka."+SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2")
.option("kafka."+CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL")
.option("kafka."+SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, " ")
.option("kafka."+ProducerConfig.ACKS_CONFIG, "all")
.save();

}


}

Tuesday, December 13, 2022

Array of structs

 scala> import spark.implicits._

import spark.implicits._


scala> import org.apache.spark.sql.types._

import org.apache.spark.sql.types._


scala> import org.apache.spark.sql._

import org.apache.spark.sql._


scala>  val arrayStructData = Seq(

     |       Row("James",List(Row("Java","XX",120),Row("Scala","XA",300))),

     |       Row("Michael",List(Row("Java","XY",200),Row("Scala","XB",500))),

     |       Row("Robert",List(Row("Java","XZ",400),Row("Scala","XC",250))),

     |       Row("Washington",null)

     |     )

arrayStructData: Seq[org.apache.spark.sql.Row] = List([James,List([Java,XX,120], [Scala,XA,300])], [Michael,List([Java,XY,200], [Scala,XB,500])], [Robert,List([Java,XZ,400], [Scala,XC,250])], [Washington,null])



    val arrayStructSchema = new StructType().add("name",StringType)

      .add("booksIntersted",ArrayType(new StructType()

        .add("name",StringType)

        .add("author",StringType)

        .add("pages",IntegerType)))



arrayStructSchema: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(booksIntersted,ArrayType(StructType(StructField(name,StringType,true), StructField(author,StringType,true), StructField(pages,IntegerType,true)),true),true))


scala>     val df = spark.createDataFrame(spark.sparkContext

     |         .parallelize(arrayStructData),arrayStructSchema)

df: org.apache.spark.sql.DataFrame = [name: string, booksIntersted: array<struct<name:string,author:string,pages:int>>]


scala>     df.printSchema()

root

 |-- name: string (nullable = true)

 |-- booksIntersted: array (nullable = true)

 |    |-- element: struct (containsNull = true)

 |    |    |-- name: string (nullable = true)

 |    |    |-- author: string (nullable = true)

 |    |    |-- pages: integer (nullable = true)



scala>     df.show(false)

+----------+-----------------------------------+

|name      |booksIntersted                     |

+----------+-----------------------------------+

|James     |[[Java, XX, 120], [Scala, XA, 300]]|

|Michael   |[[Java, XY, 200], [Scala, XB, 500]]|

|Robert    |[[Java, XZ, 400], [Scala, XC, 250]]|

|Washington|null                               |

+----------+-----------------------------------+


Thursday, December 8, 2022

UnionByName with the empty dataframe

 import spark.implicits._

import org.apache.spark.sql.types.
import org.apache.spark.sql._

val data = Seq(("James","Sales",34), ("Michael","Sales",56),
("Robert","Sales",30), ("Maria","Finance",24) )
val df1 = data.toDF("name","dept","age")
df1.printSchema()



val schema = StructType(
StructField("name", StringType, true) ::
StructField("dept", StringType, false) ::
StructField("age", IntegerType, false) :: Nil)


val df = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema)


val merged_df = df1.unionByName(df)
merged_df.show(false)

val merged_df2 = df.unionByName(df1)
merged_df2.show(false)

Tuesday, November 29, 2022

Writing empty dataframe spark

 import spark.implicits._


val data=Seq()

val columns = Seq("firstname","lastname","country","state")

import spark.implicits._

val df = data.toDF(columns:_*)



println("emptydataframe write succeed")

val df = spark.emptyDataFrame

val path = "/Users/basan/Documents/sparktest/emptyFolder2"

df.write.format("orc").save(path)