Powered By Blogger

Thursday, November 21, 2019

spark shell structured streaming with spark 2.3.2

spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import scala.concurrent.duration._

val records = spark.
  readStream.
  format("kafka").
  option("subscribe", "basan-topic").
  option("kafka.security.protocol" , "SSL").
  option("kafka.bootstrap.servers", "kafka-basan.com:9093").
  option("kafka.ssl.truststore.location" , "/Users/basan/client.truststore.jks").
  option("kafka.ssl.truststore.password" ,  "changeit").
  option("kafka.ssl.keystore.location" , "/Users/basan/basan.com.jks").
  option("kafka.ssl.keystore.password" , "changeit")
  load




val q = records.
  writeStream.
  format("console").
  option("truncate", false).
  trigger(Trigger.ProcessingTime(50.seconds)).
  outputMode(OutputMode.Update).
  start

q.stop


Useful link
https://jaceklaskowski.gitbooks.io/spark-structured-streaming/spark-sql-streaming-KafkaSource.html

No comments:

Post a Comment