spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import scala.concurrent.duration._
val records = spark.
readStream.
format("kafka").
option("subscribe", "basan-topic").
option("kafka.security.protocol" , "SSL").
option("kafka.bootstrap.servers", "kafka-basan.com:9093").
option("kafka.ssl.truststore.location" , "/Users/basan/client.truststore.jks").
option("kafka.ssl.truststore.password" , "changeit").
option("kafka.ssl.keystore.location" , "/Users/basan/basan.com.jks").
option("kafka.ssl.keystore.password" , "changeit")
load
val q = records.
writeStream.
format("console").
option("truncate", false).
trigger(Trigger.ProcessingTime(50.seconds)).
outputMode(OutputMode.Update).
start
q.stop
Useful link
https://jaceklaskowski.gitbooks.io/spark-structured-streaming/spark-sql-streaming-KafkaSource.html
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import scala.concurrent.duration._
val records = spark.
readStream.
format("kafka").
option("subscribe", "basan-topic").
option("kafka.security.protocol" , "SSL").
option("kafka.bootstrap.servers", "kafka-basan.com:9093").
option("kafka.ssl.truststore.location" , "/Users/basan/client.truststore.jks").
option("kafka.ssl.truststore.password" , "changeit").
option("kafka.ssl.keystore.location" , "/Users/basan/basan.com.jks").
option("kafka.ssl.keystore.password" , "changeit")
load
val q = records.
writeStream.
format("console").
option("truncate", false).
trigger(Trigger.ProcessingTime(50.seconds)).
outputMode(OutputMode.Update).
start
q.stop
Useful link
https://jaceklaskowski.gitbooks.io/spark-structured-streaming/spark-sql-streaming-KafkaSource.html
No comments:
Post a Comment