package com.basan.day3
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.log4j._
import scala.collection.immutable._
object ErrorWarnCount {
def main(args: Array[String]) {
val sc = new SparkContext("local[*]", "ERROR count")
val list: List[String] = List(
"WARN : Tuesday 4 September 0408",
"ERROR : Tuesday 4 September 0408",
"ERROR : Tuesday 4 September 0408",
"ERROR : Tuesday 4 September 0408")
val originalRDD = sc.parallelize(list)
val new_pair_rdd = originalRDD.map(x => {
var columns = x.split(":")
var level = columns(0)
var date = columns(1)
(level, 1)
})
val redultant_rdd = new_pair_rdd.reduceByKey((x, y) => x + y)
redultant_rdd.collect().foreach(println)
println("single line conversion")
//convertin to single line
sc.parallelize(list).map(x=> {
(x.split(":")(0),1)
}).reduceByKey(_+_).collect().foreach(println)
}
}
No comments:
Post a Comment