Pyspark, фильтрующий элементы в столбце списков

Я пытаюсь отфильтровать данные в dataframe. Dataframe df имеет 2 столбца - query + href . В одной строке: query является случайной строкой, а href является списком строк. У меня есть еще один список, который называется urls со строками.

Поиск по URL-адресу из списка urls внутри списков столбцов href + положение URL- urls в списке href . Я пробовал df.filter(col("href")).isin(urls) urls df.filter(col("href")).isin(urls) но pyspark жалуется на список. + Я не могу сделать .collect () bcs объема данных.

Заранее спасибо!

По сути, это должно выглядеть так, но я не совсем уверен, как это сделать в pyspark:

for url in urls:
    if url in "href item list":
        print(query + url + "href item list".index(url)) # doesn't matter if index or position
    else:
        pass

Пример:

urls = [url1, url2, url3, url4, url5, url6, url7, url8]

query | href
------------
q1    | [url7, url11, url12, url13, url14]
q2    | [url1, url3, url5, url6]
q3    | [url1, url2, url8]

Output should look like 

q2 - url1 - 0
q3 - url1 - 0
q3 - url2 - 1
q2 - url3 - 1
q2 - url5 - 2
q2 - url6 - 3
q1 - url7 - 0
q3 - url8 - 2

Всего 3 ответа


Шаги в словах:

  1. explode колонку
  2. filter эти строки по известному URL
  3. collect результаты и искать каждый URL в urls

Приведенный ниже код разбит на небольшие шаги, чтобы упростить проверку промежуточных фреймов данных.

Предполагая, что у вас уже есть объект SparkSession именем ss , мы можем воссоздать ваш оригинальный DataFrame следующим образом:

df = ss.createDataFrame(
    [
        ("q1", ["url7", "url11", "url12", "url13", "url14"]),
        ("q2", ["url1", "url3", "url5", "url6"]),
        ("q3", ["url1", "url2", "url8"]),
    ],
    ["query", "href"],
)
urls = ["url1", "url2", "url3", "url4", "url5", "url6", "url7", "url8"]

Теперь мы применим шаги, описанные ранее:

import pyspark.sql.functions as sf

# Exploding the column "href".
exp_df = df.select("query", sf.explode(sf.col("href")).alias("href_sing"))
# Checking if the URL in the DataFrame exists in "urls".
# I suggest to convert "urls" into a "set" before this step: "set(urls)". It might 
# improve the performance of "isin", but this is just an optional optimization.
known_df = exp_df.select("*", sf.col("href_sing").isin(urls).alias("is_known"))
# Discard unknown URLs.
true_df = true_df = known_df.filter("is_known = True")
# The final results.
res = [
    (r["query"], r["href_sing"], urls.index(r["href_sing"]))
    for r in true_df.collect()
]

Проверка некоторых значений:

In [18]: df.show()      
+-----+--------------------+
|query|                href|
+-----+--------------------+
|   q1|[url7, url11, url...|
|   q2|[url1, url3, url5...|
|   q3|  [url1, url2, url8]|
+-----+--------------------+

In [19]: exp_df.show()                                                                    
+-----+---------+
|query|href_sing|
+-----+---------+
|   q1|     url7|
|   q1|    url11|
|   q1|    url12|
|   q1|    url13|
|   q1|    url14|
|   q2|     url1|
|   q2|     url3|
|   q2|     url5|
|   q2|     url6|
|   q3|     url1|
|   q3|     url2|
|   q3|     url8|
+-----+---------+

In [20]: true_df.show()                                                                   
+-----+---------+--------+
|query|href_sing|is_known|
+-----+---------+--------+
|   q1|     url7|    true|
|   q2|     url1|    true|
|   q2|     url3|    true|
|   q2|     url5|    true|
|   q2|     url6|    true|
|   q3|     url1|    true|
|   q3|     url2|    true|
|   q3|     url8|    true|
+-----+---------+--------+

In [23]: res                                                                              
Out[23]: 
[('q1', 'url7', 6),
 ('q2', 'url1', 0),
 ('q2', 'url3', 2),
 ('q2', 'url5', 4),
 ('q2', 'url6', 5),
 ('q3', 'url1', 0),
 ('q3', 'url2', 1),
 ('q3', 'url8', 7)]

Я предлагаю 1) сделать один столбец DataFrame ваших urls с помощью posexplode и 2) использовать posexplode для создания 3-столбцового DataFrame вашего запроса, href и index-position href, затем 3) внутреннего соединения двух

  1. Создать DataFrame из urls
from pyspark.sql.functions import explode, posexplode

urls = [
    (['url1', 'url2', 'url3', 'url4', 'url5', 'url6', 'url7', 'url8'],),
]
refs = (
    spark.createDataFrame(urls, ['ref']).
        select(
            explode('ref')
        )
)
refs.show(truncate=False)
# +----+
# |col |
# +----+
# |url1|
# |url2|
# |url3|
# |url4|
# |url5|
# |url6|
# |url7|
# |url8|
# +----+
  1. Создайте предоставленные вами примеры данных
data = [
    ("q1", ["url7", "url11", "url12", "url13", "url14"]),
    ("q2", ["url1", "url3", "url5", "url6"]),
    ("q3", ["url1", "url2", "url8"]),
]
df = spark.createDataFrame(data, ["query", "href"])
df.show(truncate=False)
# +-----+----------------------------------+
# |query|href                              |
# +-----+----------------------------------+
# |q1   |[url7, url11, url12, url13, url14]|
# |q2   |[url1, url3, url5, url6]          |
# |q3   |[url1, url2, url8]                |
# +-----+----------------------------------+
  1. Решение
(
    df.
        select(
            'query',
            posexplode('href')
        ).
        join(
            refs,
            'col',
            'inner'
        ).
        orderBy('col', 'query').
        show(truncate=False)
)
# +----+-----+---+                                                                
# |col |query|pos|
# +----+-----+---+
# |url1|q2   |0  |
# |url1|q3   |0  |
# |url2|q3   |1  |
# |url3|q2   |1  |
# |url5|q2   |2  |
# |url6|q2   |3  |
# |url7|q1   |0  |
# |url8|q3   |2  |
# +----+-----+---+

Если вы можете использовать spark2.4, вы можете сделать это без join или collect . Наиболее эффективным, простым и масштабируемым методом должно быть использование array_position и array_contains .

df.show()
+-----+----------------------------------+
|query|href                              |
+-----+----------------------------------+
|q1   |[url7, url11, url12, url13, url14]|
|q2   |[url1, url3, url5, url6]          |
|q3   |[url1, url2, url8]                |
+-----+----------------------------------+

urls=['url1', 'url2', 'url3', 'url4', 'url5', 'url6', 'url7', 'url8']
from pyspark.sql import functions as F
df.withColumn("href1", F.col("href")).select("query", F.explode("href").alias("col"),"href1")
  .withColumn("position", F.expr("""array_position(href1,col)-1""")).drop("href1")
  .withColumn("array", F.array(*(F.lit(x) for x in urls)))
  .withColumn("array_contains", F.expr("""array_contains(array,col)"""))
  .filter("array_contains= true")
  .select("query", F.col("col").alias("href"),"position").show()

+-----+----+--------+
|query|href|position|
+-----+----+--------+
|   q1|url7|       0|
|   q2|url1|       0|
|   q2|url3|       1|
|   q2|url5|       2|
|   q2|url6|       3|
|   q3|url1|       0|
|   q3|url2|       1|
|   q3|url8|       2|
+-----+----+--------+

Есть идеи?

10000