PySpark中高效移除重复数据的两种策略(两种.高效.移除.重复.策略...)

wufei123 发布于 2025-09-02 阅读(5)

PySpark中高效移除重复数据的两种策略

本文详细阐述了在PySpark环境中处理重复数据的两种主要方法:针对原生PySpark SQL DataFrame的dropDuplicates()和针对PySpark Pandas DataFrame的drop_duplicates()。文章深入分析了这两种函数的用法、适用场景及关键区别,并通过代码示例和注意事项,指导用户根据其DataFrame类型选择最合适的去重策略,确保数据处理的准确性和效率。PySpark中重复数据处理概述

在数据处理和分析中,移除重复记录是数据清洗的关键步骤之一,尤其是在处理大规模数据集时。pyspark作为大数据处理的强大框架,提供了高效的机制来识别和消除dataframe中的重复行。然而,由于pyspark生态系统的发展,目前存在两种主要的dataframe类型,它们各自拥有不同的去重api:原生的pyspark.sql.dataframe和基于pandas api的pyspark.pandas.dataframe。理解这两种类型的差异及其对应的去重方法,对于编写健壮且高效的pyspark代码至关重要。

使用 pyspark.sql.DataFrame.dropDuplicates() 进行去重

pyspark.sql.DataFrame是PySpark的核心数据结构,它提供了类似于关系型数据库表的操作接口。对于这种类型的DataFrame,去重操作通过dropDuplicates()方法实现。

函数签名与用法

dropDuplicates()函数可以接受一个可选的列名列表作为参数,用于指定在哪些列上进行重复检查。如果不指定任何列,则默认会检查所有列。

DataFrame.dropDuplicates(subset=None)
  • subset: 可选参数,一个字符串列表,指定用于识别重复行的列。如果为None,则所有列都将用于去重。
示例代码

假设我们有一个包含客户ID的PySpark SQL DataFrame,我们希望移除重复的客户ID。

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# 初始化SparkSession
spark = SparkSession.builder.appName("DropDuplicatesSQL").getOrCreate()

# 创建一个示例PySpark SQL DataFrame
data = [("C001", "Alice"), ("C002", "Bob"), ("C001", "Alice"), ("C003", "Charlie"), ("C002", "Bob")]
columns = ["CUSTOMER_ID", "NAME"]
df_sql = spark.createDataFrame(data, columns)

print("原始 PySpark SQL DataFrame:")
df_sql.show()

# 1. 对所有列进行去重
df_distinct_all = df_sql.dropDuplicates()
print("所有列去重后的 DataFrame:")
df_distinct_all.show()

# 2. 仅根据 'CUSTOMER_ID' 列进行去重
# 注意:当仅根据子集去重时,对于重复的子集行,Spark会保留其中任意一行,其非子集列的值可能不确定。
# 在此示例中,由于(C001, Alice)是完全重复的,所以行为一致。
# 但如果数据是 (C001, Alice) 和 (C001, David),则去重后会保留其中一个。
df_distinct_id = df_sql.dropDuplicates(subset=["CUSTOMER_ID"])
print("根据 'CUSTOMER_ID' 列去重后的 DataFrame:")
df_distinct_id.show()

# 停止SparkSession
spark.stop()

输出示例:

原始 PySpark SQL DataFrame:
+-----------+-------+
|CUSTOMER_ID|   NAME|
+-----------+-------+
|       C001|  Alice|
|       C002|    Bob|
|       C001|  Alice|
|       C003|Charlie|
|       C002|    Bob|
+-----------+-------+

所有列去重后的 DataFrame:
+-----------+-------+
|CUSTOMER_ID|   NAME|
+-----------+-------+
|       C001|  Alice|
|       C002|    Bob|
|       C003|Charlie|
+-----------+-------+

根据 'CUSTOMER_ID' 列去重后的 DataFrame:
+-----------+-------+
|CUSTOMER_ID|   NAME|
+-----------+-------+
|       C001|  Alice|
|       C002|    Bob|
|       C003|Charlie|
+-----------+-------+
使用 pyspark.pandas.DataFrame.drop_duplicates() 进行去重

PySpark Pandas API(pyspark.pandas)旨在为熟悉Pandas库的用户提供一个在Spark上运行的相似接口。对于通过pyspark.pandas创建或转换而来的DataFrame,其去重方法与Pandas中的drop_duplicates()保持一致。

函数签名与用法

drop_duplicates()函数提供了更丰富的参数,以控制去重行为,例如保留哪个重复项(第一个、最后一个或不保留)。

DataFrame.drop_duplicates(subset=None, keep='first', inplace=False, ignore_index=False)
  • subset: 可选参数,一个字符串列表,指定用于识别重复行的列。如果为None,则所有列都将用于去重。
  • keep: 字符串,可选值有'first'、'last'或False。
    • 'first': 保留第一个出现的重复行。
    • 'last': 保留最后一个出现的重复行。
    • False: 删除所有重复行(即,如果某行有重复,则该行及其所有重复项都会被删除)。
  • inplace: 布尔值,如果为True,则在原始DataFrame上进行操作并返回None;如果为False,则返回一个新DataFrame。
  • ignore_index: 布尔值,如果为True,则重置结果DataFrame的索引。
示例代码
import pyspark.pandas as ps
from pyspark.sql import SparkSession

# 初始化SparkSession (pyspark.pandas 会自动使用现有的SparkSession)
spark = SparkSession.builder.appName("DropDuplicatesPandas").getOrCreate()

# 创建一个示例PySpark Pandas DataFrame
data = {"CUSTOMER_ID": ["C001", "C002", "C001", "C003", "C002"],
        "NAME": ["Alice", "Bob", "Alice", "Charlie", "Bob"]}
psdf = ps.DataFrame(data)

print("原始 PySpark Pandas DataFrame:")
print(psdf)

# 1. 对所有列进行去重 (默认 keep='first')
psdf_distinct_all = psdf.drop_duplicates()
print("所有列去重后的 DataFrame:")
print(psdf_distinct_all)

# 2. 仅根据 'CUSTOMER_ID' 列进行去重,保留第一个
psdf_distinct_id_first = psdf.drop_duplicates(subset=["CUSTOMER_ID"], keep='first')
print("根据 'CUSTOMER_ID' 列去重 (保留第一个) 后的 DataFrame:")
print(psdf_distinct_id_first)

# 3. 仅根据 'CUSTOMER_ID' 列进行去重,保留最后一个
psdf_distinct_id_last = psdf.drop_duplicates(subset=["CUSTOMER_ID"], keep='last')
print("根据 'CUSTOMER_ID' 列去重 (保留最后一个) 后的 DataFrame:")
print(psdf_distinct_id_last)

# 4. 仅根据 'CUSTOMER_ID' 列进行去重,删除所有重复项
psdf_distinct_id_false = psdf.drop_duplicates(subset=["CUSTOMER_ID"], keep=False)
print("根据 'CUSTOMER_ID' 列去重 (删除所有重复项) 后的 DataFrame:")
print(psdf_distinct_id_false)

# 停止SparkSession (如果需要,但通常在脚本结束时自动停止)
spark.stop()

输出示例:

原始 PySpark Pandas DataFrame:
  CUSTOMER_ID     NAME
0        C001    Alice
1        C002      Bob
2        C001    Alice
3        C003  Charlie
4        C002      Bob

所有列去重后的 DataFrame:
  CUSTOMER_ID     NAME
0        C001    Alice
1        C002      Bob
3        C003  Charlie

根据 'CUSTOMER_ID' 列去重 (保留第一个) 后的 DataFrame:
  CUSTOMER_ID     NAME
0        C001    Alice
1        C002      Bob
3        C003  Charlie

根据 'CUSTOMER_ID' 列去重 (保留最后一个) 后的 DataFrame:
  CUSTOMER_ID     NAME
2        C001    Alice
4        C002      Bob
3        C003  Charlie

根据 'CUSTOMER_ID' 列去重 (删除所有重复项) 后的 DataFrame:
  CUSTOMER_ID     NAME
3        C003  Charlie
选择正确的去重方法:关键区别与注意事项

选择dropDuplicates()还是drop_duplicates()的核心在于你正在操作的DataFrame类型。

  1. DataFrame类型识别:

    • 如果你通过spark.createDataFrame()或读取Spark数据源(如Parquet、CSV)创建DataFrame,你得到的是pyspark.sql.DataFrame。此时应使用dropDuplicates()。
    • 如果你通过pyspark.pandas.DataFrame()构造函数创建DataFrame,或者将pyspark.sql.DataFrame通过df.to_pandas_on_spark()(或旧版df.to_pandas())转换为pyspark.pandas.DataFrame,那么你应该使用drop_duplicates()。

    你可以通过type(df)或df.__class__.__name__来检查DataFrame的类型。

  2. API一致性:

    • dropDuplicates()是Spark原生的API,其行为和性能优化是基于Spark分布式计算模型设计的。
    • drop_duplicates()则遵循Pandas的API规范,对于熟悉Pandas的用户来说更直观。它在底层会转换为Spark操作,但其接口与Pandas保持高度一致。
  3. 功能差异:

    • dropDuplicates()相对简洁,主要关注去重本身。当基于子集去重时,它保留哪个重复项是不确定的(通常是Spark内部优化决定的任意一个)。
    • drop_duplicates()提供了keep参数,允许你精确控制保留第一个、最后一个还是删除所有重复项,这在某些业务场景下非常有用。
  4. 性能考量: 两种方法在底层都会触发Spark的distinct或groupBy操作,这通常涉及到数据的shuffle(混洗),对于大规模数据集而言,shuffle是计算密集型操作。因此,无论使用哪种方法,都应注意其对性能的影响。

总结

PySpark提供了两种强大且高效的方法来处理DataFrame中的重复数据:pyspark.sql.DataFrame的dropDuplicates()和pyspark.pandas.DataFrame的drop_duplicates()。理解它们各自的适用场景和功能特性是编写高效PySpark代码的关键。在实践中,务必根据你当前操作的DataFrame类型来选择正确的去重函数。当需要更精细地控制重复项的保留策略时,pyspark.pandas.DataFrame.drop_duplicates()的keep参数提供了更大的灵活性。始终牢记,去重操作可能涉及数据混洗,因此在处理超大规模数据集时,应评估其性能影响。

以上就是PySpark中高效移除重复数据的两种策略的详细内容,更多请关注知识资源分享宝库其它相关文章!

标签:  两种 高效 移除 

发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。