val df = Seq(
("d1", "2018-09-20 10:00:00", "blah1"),
("d2", "2018-09-20 09:00:00", "blah2"),
("d1", "2018-09-20 10:01:00", "blahnew")
).toDF("documentId","timestamp","anotherField")
import org.apache.spark.sql.functions.row_number
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy($"documentId").orderBy($"timestamp".desc)
val Resultdf = df.withColumn("rownum", row_number.over(w))
.where($"rownum" === 1).drop("rownum")
Resultdf.show()
input:
+----------+-------------------+------------+
|documentId| timestamp|anotherField|
+----------+-------------------+------------+
| d1|2018-09-20 10:00:00| blah1|
| d2|2018-09-20 09:00:00| blah2|
| d1|2018-09-20 10:01:00| blahnew|
+----------+-------------------+------------+
output:
+----------+-------------------+------------+
|documentId| timestamp|anotherField|
+----------+-------------------+------------+
| d2|2018-09-20 09:00:00| blah2|
| d1|2018-09-20 10:01:00| blahnew|
+----------+-------------------+------------+
("d1", "2018-09-20 10:00:00", "blah1"),
("d2", "2018-09-20 09:00:00", "blah2"),
("d1", "2018-09-20 10:01:00", "blahnew")
).toDF("documentId","timestamp","anotherField")
import org.apache.spark.sql.functions.row_number
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy($"documentId").orderBy($"timestamp".desc)
val Resultdf = df.withColumn("rownum", row_number.over(w))
.where($"rownum" === 1).drop("rownum")
Resultdf.show()
input:
+----------+-------------------+------------+
|documentId| timestamp|anotherField|
+----------+-------------------+------------+
| d1|2018-09-20 10:00:00| blah1|
| d2|2018-09-20 09:00:00| blah2|
| d1|2018-09-20 10:01:00| blahnew|
+----------+-------------------+------------+
output:
+----------+-------------------+------------+
|documentId| timestamp|anotherField|
+----------+-------------------+------------+
| d2|2018-09-20 09:00:00| blah2|
| d1|2018-09-20 10:01:00| blahnew|
+----------+-------------------+------------+
No comments:
Post a Comment