Как заполнить столбец значением, взятым из предыдущей строки, без естественного ключа разделения, используя Spark Scala DataFrame

Входные данные:

val input = Seq(
     |   ("1 February"),
     |   ("n"),
     |   ("c"),
     |   ("b"),
     |   ("2 February"),
     |   ("h"),
     |   ("w"),
     |   ("e"),
     |   ("3 February"),
     |   ("y"),
     |   ("s"),
     |   ("j")
     | ).toDF("lines")

input.show

Вход выглядит так:

+----------+
|     lines|
+----------+
|1 February|
|         n|
|         c|
|         b|
|2 February|
|         h|
|         w|
|         e|
|3 February|
|         y|
|         s|
|         j|
+----------+

Требуемый выход:

val output = Seq(
     |   ("1 February", "n"),
     |   ("1 February", "c"),
     |   ("1 February", "b"),
     |   ("2 February", "h"),
     |   ("2 February", "w"),
     |   ("2 February", "e"),
     |   ("3 February", "y"),
     |   ("3 February", "s"),
     |   ("3 February", "j")
     | ).toDF("date", "lines")

output.show

Требуемый вывод выглядит следующим образом:

+----------+-----+
|      date|lines|
+----------+-----+
|1 February|    n|
|1 February|    c|
|1 February|    b|
|2 February|    h|
|2 February|    w|
|2 February|    e|
|3 February|    y|
|3 February|    s|
|3 February|    j|
+----------+-----+

Я подумываю об использовании функции задержки в фрейме данных Scala Spark, но на самом деле не могу разобраться после нескольких часов. У кого-нибудь есть идея? Большое спасибо.

Всего 2 ответа


Шкала рекордов и прочего неясна.

Имхо, это пример плохой подачи, делающей жизнь более трудной и сложной, и если в масштабе ситуация такова, как соединение с большим столом с менее крупным соединением, то в Spark это не так хорошо. Я не уверен в лучшей стратегии разделения, поскольку есть проблемы, но если у вас есть большие данные, вы можете разделить их по месяцам (, годам). Или даже по неделям и добавить в выходной магазин.

Вам нужно будет объединить поля, но вот решение:

import spark.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.split
import org.apache.spark.sql.functions.broadcast
import org.apache.spark.sql.functions.{lead, lag}
import org.apache.spark.sql.expressions.Window

// Assume sorted input limited to a year? Unclear from the question. Volumes also not clear. Added some extra input.

val df = Seq(
    ("1 February"),
    ("n"),
    ("c"),
    ("b"),
    ("2 February"),
    ("h"),
    ("w"),
    ("e"),
    ("3 February"),
    ("y"),
    ("s"),
    ("j"),
    ("1 March"),
    ("c"),
    ("b"),
    ("x")
   ).toDF("line")

// Fill the other months in accordingly.
// Years aspect? Not considered.

val m = Map("january" -> 1,"february" -> 2,"march" -> 3, "april" -> 4)
val dfM = m.toSeq.toDF("month", "monthNum")
// monthNum not used in hindsight, but left in.

// Add a non-consecutive Sequence Nr. This not an issue as we use range checking later on, not a previous value of -1 relative to a Sequence Nr to check directly against.
// Partition position retained. The other option is .zipWithIndex with rdd conversion and back again to df. 
// monotonically_increasing_id is non-deterministic but it should be OK here as far as I can see.

val df2 = df.withColumn("idx", monotonically_increasing_id())

val df3 = df2.withColumn("_tmp", split($"line", " "))
             .select($"idx",
                 $"_tmp".getItem(0).as("col1"),
                 $"_tmp".getItem(1).as("col2")
                    )
            .drop("_tmp")

val df4 = (df3.join(broadcast(dfM), lower(df3.col("col2")) === dfM.col("month")).drop("month")) 

val w = org.apache.spark.sql.expressions.Window.orderBy("idx")  
val df5 = df4.withColumn("idxNextDate", (lead("idx", 1, 999999999999L).over(w)-1)).toDF("idx1","dte","mth", "mthNum", "idxNextDate")

// Not sure about performance, quite tricky, get large table / large table situation which has no real performance solution with Spark except for brute force parallelism.
// Could filter out the non-Null stuff.
val df6 = df3.filter($"col2".isNull)
val df7 = df6.join(df5, ($"idx" >= $"idx1") && ($"idx" <= $"idxNextDate"))
             .select($"dte", $"mth", $"col1".alias("lines"))
df7.show(false)

возвращает:

+---+--------+-----+
|dte|mth     |lines|
+---+--------+-----+
|1  |February|n    |
|1  |February|c    |
|1  |February|b    |
|2  |February|h    |
|2  |February|w    |
|2  |February|e    |
|3  |February|y    |
|3  |February|s    |
|3  |February|j    |
|1  |March   |c    |
|1  |March   |b    |
|1  |March   |x    |
+---+--------+-----+

Я подумаю о разделении, но в этом алгоритме, похоже, сложно выбрать подходящую стратегию. Я не уверен, что это элегантно возможно разделить, это хороший пример обработки, который не вписывается в стиль обработки Spark. Я могу быть исправлен, хотя.

ОБНОВЛЕНИЕ Не могу найти подходящий раздел. Поэтому нужно обрабатывать небольшие наборы.

Рекомендация в этом случае заключается в том, чтобы адаптировать канал так, чтобы сделать вещи тривиальными, а не заставлять людей работать над решением такой проблемы.


val input1 = input.withColumn("RowId", monotonicallyIncreasingId).withColumn("Date", col("lines"))
val len = udf((str: String) => str.size)
val rowIds = input1.filter(len(col("lines")) < 13 && len(col("lines")) > 4).select("RowId").collect.map(_(0)).map(_.toString.toInt)
val rowIdSlided = (rowIds :+ input.count).sliding(2).toList
val output = rowIdSlided.foldLeft(input1)((acc, rowIdList) => {acc.withColumn("Date", when((col("RowId") > rowIdList(0) && col("RowId") < rowIdList(1)), ({val date = input1.filter(col("RowId") === rowIdList(0)).select("lines").collect.map(_(0)).map(_.toString); date(0)})).otherwise(col("Date")))}).drop("RowId")

Это должно дать вывод:

+----------+----------+
|     lines|      Date|
+----------+----------+
|1 February|1 February|
|         n|1 February|
|         c|1 February|
|         b|1 February|
|2 February|2 February|
|         h|2 February|
|         w|2 February|
|         e|2 February|
|3 February|3 February|
|         y|3 February|
|         s|3 February|
|         j|3 February|
+----------+----------+

Есть идеи?

10000