Как вы можете обойти ограничение 2GB-буфера при использовании Dataset.groupByKey?

При использовании Dataset.groupByKey(_.key).mapGroups или Dataset.groupByKey(_.key).cogroup в Spark у меня возникла проблема, когда одна из групп приводит к более чем 2 ГБ данных.

Мне нужно нормализовать данные по группам, прежде чем я смогу их уменьшить, и я хотел бы разделить группы на более мелкие подгруппы, чтобы они лучше распространялись. Например, вот один из способов, по которым я попытался разделить группы:

val groupedInputs = inputData.groupByKey(_.key).mapGroups {
    case(key, inputSeries) => inputSeries.grouped(maxGroupSize).map(group => (key, group))
}

Но, к сожалению, я пытаюсь обойти это, моя работа всегда умирает с такой ошибкой: java.lang.UnsupportedOperationException: Cannot grow BufferHolder by size 23816 because the size after growing exceeds size limitation 2147483632 . При использовании сериализации Kryo я получаю другую Kryo serialization failed: Buffer overflow ошибка Kryo serialization failed: Buffer overflow рекомендующая увеличить значение spark.kryoserializer.buffer.max, но я уже увеличил ее до предела 2 ГБ.

Одним из решений, которое возникает для меня, является добавление случайного значения к ключам перед их группировкой. Это не идеально, поскольку он будет разделять каждую группу (а не только крупные), но я готов пожертвовать «идеалом» ради «работы». Этот код будет выглядеть примерно так:

val splitInputs = inputData.map( record => (record, ThreadLocalRandom.current.nextInt(splitFactor)))
val groupedInputs = splitInputs.groupByKey{ case(record, split) => (record.key, split)).mapGroups {
    case((key, _), inputSeries) => inputSeries.grouped(maxGroupSize).map(group => (key, group.map(_._1)))
}

Всего 2 ответа


Добавьте ключ соли и сделайте groupBy на свой ключ и соляной ключ, а затем

import scala.util.Random
    val start = 1
      val end   = 5
      val randUdf = udf({() => start + Random.nextInt((end - start) + 1)})

      val saltGroupBy=skewDF.withColumn("salt_key", randUdf())
        .groupBy(col("name"), col("salt_key"))

Таким образом, ваши косые данные не попадают в один исполнитель и не приводят к ограничению 2 ГБ.

Но вам нужно разработать логику для суммирования приведенного выше результата и, наконец, удалить ключ соли в конце.

Когда вы используете groupBy, все записи с одним и тем же ключом получат один исполнитель, а шея бутылки. Вышесказанное является одним из способов его смягчения.


Для этого случая, когда набор данных имел много перекосов, и было важно сгруппировать записи в группы с обычным размером, я решил обработать набор данных за два прохода. Сначала я использовал функцию окна для номера строк по ключу и преобразовал ее в «групповой индекс» на основе настраиваемого «maxGroupSize»:

// The "orderBy" doesn't seem necessary here, 
// but the row_number function requires it.
val partitionByKey = Window.partitionBy(key).orderBy(key)

val indexedData = inputData.withColumn("groupIndex", 
  (row_number.over(partitionByKey) / maxGroupSize).cast(IntegerType))
  .as[(Record, Int)]

Затем я могу группировать по ключевым словам и индексам и создавать группы, размер которых последовательно - ключи с большим количеством записей больше разделяются, а ключи с несколькими записями вообще не могут быть разделены.

indexedData.groupByKey{ case (record, groupIndex) => (record.key, groupIndex) }
  .mapGroups{ case((key, _), recordGroup) =>
      // Remove the index values before returning the groups
      (key, recordGroup.map(_._1))
  }

Есть идеи?

10000