Powered By Blogger

Friday, May 10, 2024

Download file from sharepoint site



package com.tgt.ding.sp.reader;

import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.json.JSONObject;

public class SharePointDownload {

private static final String SHAREPOINT_SITE = "https://basanonline.sharepoint.com/sites/ogrp-ds-basan";
private static final String DOCUMENT_LIBRARY = "Documents";
private static final String FILE_NAME = "Book.xlsx";
private static final String CLIENT_ID = "your-client-id";
private static final String CLIENT_SECRET = "your-client-secret";
private static final String ACCESS_TOKEN = getAccessToken(CLIENT_ID, CLIENT_SECRET); // Implement access token retrieval logic

public static void main(String[] args) throws URISyntaxException, IOException {
String downloadUrl = buildDownloadUrl(SHAREPOINT_SITE, DOCUMENT_LIBRARY, FILE_NAME);
downloadFile(downloadUrl, ACCESS_TOKEN);
}

private static String buildDownloadUrl(String siteUrl, String documentLibrary, String fileName) {
return siteUrl + "/_api/web/GetFileByServerRelativePath(decodedurl='/sites/ogrp-ds-basan/Shared%20Documents/DataFolder/testCrawl/Book.xlsx')/$value";
}

private static void downloadFile(String downloadUrl, String accessToken) throws URISyntaxException, IOException {
CloseableHttpClient httpClient = HttpClients.createDefault();
HttpGet request = new HttpGet(new URI(downloadUrl));
request.addHeader("Authorization", "Bearer " + accessToken);

try (CloseableHttpResponse response = httpClient.execute(request)) {
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode == 200) {
InputStream inputStream = response.getEntity().getContent();
FileOutputStream outputStream = new FileOutputStream("Book.xlsx");
byte[] buffer = new byte[1024];
int bytesRead;
while ((bytesRead = inputStream.read(buffer)) != -1) {
outputStream.write(buffer, 0, bytesRead);
}
outputStream.close();
System.out.println("File downloaded successfully!");
} else {
System.out.println("Error downloading file: " + statusCode);
}
}
}

// Replace this with your access token retrieval logic using Azure AD
private static String getAccessToken(String clientId, String clientSecret) {
// Implement logic to get access token using Azure AD and OAuth 2.0 flow
return "token";
}
}



Get tenantid and resourceid

curl --request GET \

  --url https://basanonline.sharepoint.com/sites/ogrp-ds-basan/_vti_bin/client.svc \

  --header 'Authorization: Bearer bearer' \

  --header 'User-Agent: insomnia/8.6.1'

Look into www-authenticate header in response

Bearer r ealm="tenantid",client_id="clientidresource",trusted_issuers="tamper@*,D3776938-3DBA-481F-A652-4BEDFCAB7CD8@*,https://sts.windows.net/*/,https://login.microsoftonline.com/*/v2.0,tamperce00-000000000000@tenantid",authorization_uri="https://login.microsoftonline.com/common/oauth2/authorize"



Getting token using clientid, clientsecret and tenantid and clientidresource


curl --request POST \

  --url https://accounts.accesscontrol.windows.net/tenantid/tokens/OAuth/2 \

  --header 'Content-Type: application/x-www-form-urlencoded' \

  --data grant_type=client_credentials \

  --data client_id=clientid@tenantid\

  --data 'client_secret=ClientSECRET' \

  --data resource= clientidresource/basanonline.sharepoint.com@ tenantid


 https://sharepointcass.com/2021/04/01/sharepoint-online-rest-apis-part-iii-pages/

Saturday, April 13, 2024

Spark consumer avro topic

 ./spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1,\

org.apache.spark:spark-avro_2.12:3.5.1,\

io.confluent:kafka-schema-registry-client:7.3.2,\

io.confluent:kafka-avro-serializer:7.3.2,\

io.confluent:kafka-streams-avro-serde:7.3.2 \

--repositories http://packages.confluent.io/maven/



import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.DataFrame

import org.apache.spark.sql.avro.functions._

import scala.collection.JavaConverters._



val schemaRegistryAddr = "https://schemaregistry.dev.basan.com"

spark.conf.set("schema.registry.url", schemaRegistryAddr)

val subjectName = "basan-events-value"

val schemaRegistryOptions = Map(

      "avroSchemaEvolutionMode" -> "none",

      "mode" -> "PERMISSIVE")


val kafkaDF: DataFrame = spark.readStream

        .format("kafka")

        .option("kafka.bootstrap.servers", "kafka-basan.com:9093")

        .option("kafka.security.protocol", "SSL")

        .option("kafka.ssl.truststore.location", "/basan/client.truststore.jks")

        .option("kafka.ssl.keystore.location", "/basan/server.keystore.jks")

        .option("startingOffsets", "earliest")

        .option("subscribe", "basan-events")

        .option("kafka.ssl.keystore.password", "replacewithcorrectpassword")

        .option("kafka.ssl.truststore.password", "replacewithcorrectpassword")

        .option("kafka.ssl.endpoint.identification.algorithm", "")

        .option("key.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer")

        .option("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer")

.load()


//kafkaDF.limit(10).writeStream.format("console").option("truncate", true).start().awaitTermination()


import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient}


import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient

import io.confluent.kafka.schemaregistry.client.SchemaMetadata

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient


val client: SchemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryAddr,  500)


    // Get the latest schema metadata by subject

val schemaMetadata: SchemaMetadata = client.getLatestSchemaMetadata(subjectName)

val schemaId: Int = schemaMetadata.getId()

val schemaString: String = client.getById(schemaId).toString()


//val processedDf = kafkaDF.select(from_avro($"value", schemaString,schemaRegistryOptions.asJava).as("value"))

val processedDf2 = kafkaDF.selectExpr("substring(value, 6) as value").select(from_avro($"value", schemaString,schemaRegistryOptions.asJava).as("value"))

//val processedDf = kafkaDF.select(from_avro($"value", schemaString).as("value"))

//val processedDf = kafkaDF.select(from_avro($"value", responseJson,schemaRegistryOptions.asJava).as("value"))


//val processedDf = kafkaDF.select(from_avro($"value", schemaString,schemaRegistryOptions.asJava).as("value"))

//processedDf.limit(2).writeStream.format("console").option("truncate", false).start().awaitTermination()



processedDf2.limit(500).writeStream.

option("path", "/Users/basan/Documents/dataingestion/avroingestion/data").

option("checkpointLocation", "/Users/basan/Documents/dataingestion/avroingestion/checkpointLocation").

format("json").start().awaitTermination()