почему фильтрация с использованием SQL-выражения лучше по сравнению с функцией, применяемой в DataSet в Spark

Я читаю книгу руководств по ультимативным искрам, и там говорится, что:

указав функцию, мы заставляем spark вычислить эту функцию в каждой строке в нашем dataSet ... Для простых фильтров всегда предпочтительнее написать выражение sql

Я не понимаю, почему выражение SQL будет лучше, так как выражение будет также применяться к каждой строке набора данных !!
кто-нибудь может дать мне больше деталей?

Всего 1 ответ


Используя выражение столбца, оптимизатор Spark имеет возможность оптимизировать запрос, поскольку он может «заглянуть» в фильтр и, возможно, переместить его в лучшее место, чтобы сократить время выполнения.

Пример:

Представьте, что у вас есть набор данных, состоящий из двух столбцов id и data и ваша логика сначала сгруппирует набор данных по столбцу id и суммирует значения data . После этой операции группировки должна быть сохранена только группа с id = 2 . В этом случае было бы быстрее сначала выполнить фильтрацию, а затем суммировать. Применяя фильтр в качестве выражения столбца, Spark может обнаружить эту оптимизацию и сначала применить фильтр:

val dfParquet = spark.read.parquet(<path to data>)
val groupedDf = dfParquet.groupBy("id").sum("data")
val groupedDfWithColumnFilter = groupedDf.filter("id = 2")
val groupedDfWithFilterFunction = groupedDf.filter(_.get(0).equals(2))

Если мы проверим план выполнения groupedDfWithColumnFilter мы получим

== Physical Plan ==
HashAggregate(keys=[id#0L], functions=[sum(data#1L)])
+- Exchange hashpartitioning(id#0L, 200)
   +- HashAggregate(keys=[id#0L], functions=[partial_sum(data#1L)])
      +- Project [id#0L, data#1L]
         +- Filter (isnotnull(id#0L) && (id#0L = 2))
            +- FileScan parquet [id#0L,data#1L] Batched: false, Format: Parquet, Location: InMemoryFileIndex[file:.../id], PartitionFilters: [], 
                 PushedFilters: [IsNotNull(id), EqualTo(id,2)], ReadSchema: struct

Таким образом, фильтр применяется первым и даже помещается в программу чтения файлов паркета.

Однако план выполнения groupedDfWithFilterFunction показывает, что Spark не может выполнить эту оптимизацию и применяет фильтр в качестве последнего шага, таким образом, теряя оптимизацию:

== Physical Plan ==
Filter <function1>.apply
+- HashAggregate(keys=[id#0L], functions=[sum(data#1L)])
   +- Exchange hashpartitioning(id#0L, 200)
      +- HashAggregate(keys=[id#0L], functions=[partial_sum(data#1L)])
         +- FileScan parquet [id#0L,data#1L] Batched: false, Format: Parquet, Location: InMemoryFileIndex[file:.../id], PartitionFilters: [], 
              PushedFilters: [], ReadSchema: struct


Еще один способ увидеть разницу - взглянуть на интерфейс Spark. Для моего теста я создал файл паркета с 10 миллионами строк в 100 разделах. На вкладке SQL видно, что из-за проталкиваемых фильтров для groupedDfWithColumnFilter Spark загружает с диска только около 200 тыс. groupedDfWithFilterFunction данных, тогда как для groupedDfWithFilterFunction Spark необходимо загружать все 10 млн. groupedDfWithFilterFunction :