在数据处理和分析中,移除重复记录是数据清洗的关键步骤之一,尤其是在处理大规模数据集时。pyspark作为大数据处理的强大框架,提供了高效的机制来识别和消除dataframe中的重复行。然而,由于pyspark生态系统的发展,目前存在两种主要的dataframe类型,它们各自拥有不同的去重api:原生的pyspark.sql.dataframe和基于pandas api的pyspark.pandas.dataframe。理解这两种类型的差异及其对应的去重方法,对于编写健壮且高效的pyspark代码至关重要。
使用 pyspark.sql.DataFrame.dropDuplicates() 进行去重pyspark.sql.DataFrame是PySpark的核心数据结构,它提供了类似于关系型数据库表的操作接口。对于这种类型的DataFrame,去重操作通过dropDuplicates()方法实现。
函数签名与用法dropDuplicates()函数可以接受一个可选的列名列表作为参数,用于指定在哪些列上进行重复检查。如果不指定任何列,则默认会检查所有列。
DataFrame.dropDuplicates(subset=None)
- subset: 可选参数,一个字符串列表,指定用于识别重复行的列。如果为None,则所有列都将用于去重。
假设我们有一个包含客户ID的PySpark SQL DataFrame,我们希望移除重复的客户ID。
from pyspark.sql import SparkSession from pyspark.sql.functions import col # 初始化SparkSession spark = SparkSession.builder.appName("DropDuplicatesSQL").getOrCreate() # 创建一个示例PySpark SQL DataFrame data = [("C001", "Alice"), ("C002", "Bob"), ("C001", "Alice"), ("C003", "Charlie"), ("C002", "Bob")] columns = ["CUSTOMER_ID", "NAME"] df_sql = spark.createDataFrame(data, columns) print("原始 PySpark SQL DataFrame:") df_sql.show() # 1. 对所有列进行去重 df_distinct_all = df_sql.dropDuplicates() print("所有列去重后的 DataFrame:") df_distinct_all.show() # 2. 仅根据 'CUSTOMER_ID' 列进行去重 # 注意:当仅根据子集去重时,对于重复的子集行,Spark会保留其中任意一行,其非子集列的值可能不确定。 # 在此示例中,由于(C001, Alice)是完全重复的,所以行为一致。 # 但如果数据是 (C001, Alice) 和 (C001, David),则去重后会保留其中一个。 df_distinct_id = df_sql.dropDuplicates(subset=["CUSTOMER_ID"]) print("根据 'CUSTOMER_ID' 列去重后的 DataFrame:") df_distinct_id.show() # 停止SparkSession spark.stop()
输出示例:
原始 PySpark SQL DataFrame: +-----------+-------+ |CUSTOMER_ID| NAME| +-----------+-------+ | C001| Alice| | C002| Bob| | C001| Alice| | C003|Charlie| | C002| Bob| +-----------+-------+ 所有列去重后的 DataFrame: +-----------+-------+ |CUSTOMER_ID| NAME| +-----------+-------+ | C001| Alice| | C002| Bob| | C003|Charlie| +-----------+-------+ 根据 'CUSTOMER_ID' 列去重后的 DataFrame: +-----------+-------+ |CUSTOMER_ID| NAME| +-----------+-------+ | C001| Alice| | C002| Bob| | C003|Charlie| +-----------+-------+使用 pyspark.pandas.DataFrame.drop_duplicates() 进行去重
PySpark Pandas API(pyspark.pandas)旨在为熟悉Pandas库的用户提供一个在Spark上运行的相似接口。对于通过pyspark.pandas创建或转换而来的DataFrame,其去重方法与Pandas中的drop_duplicates()保持一致。
函数签名与用法drop_duplicates()函数提供了更丰富的参数,以控制去重行为,例如保留哪个重复项(第一个、最后一个或不保留)。
DataFrame.drop_duplicates(subset=None, keep='first', inplace=False, ignore_index=False)
- subset: 可选参数,一个字符串列表,指定用于识别重复行的列。如果为None,则所有列都将用于去重。
- keep: 字符串,可选值有'first'、'last'或False。
- 'first': 保留第一个出现的重复行。
- 'last': 保留最后一个出现的重复行。
- False: 删除所有重复行(即,如果某行有重复,则该行及其所有重复项都会被删除)。
- inplace: 布尔值,如果为True,则在原始DataFrame上进行操作并返回None;如果为False,则返回一个新DataFrame。
- ignore_index: 布尔值,如果为True,则重置结果DataFrame的索引。
import pyspark.pandas as ps from pyspark.sql import SparkSession # 初始化SparkSession (pyspark.pandas 会自动使用现有的SparkSession) spark = SparkSession.builder.appName("DropDuplicatesPandas").getOrCreate() # 创建一个示例PySpark Pandas DataFrame data = {"CUSTOMER_ID": ["C001", "C002", "C001", "C003", "C002"], "NAME": ["Alice", "Bob", "Alice", "Charlie", "Bob"]} psdf = ps.DataFrame(data) print("原始 PySpark Pandas DataFrame:") print(psdf) # 1. 对所有列进行去重 (默认 keep='first') psdf_distinct_all = psdf.drop_duplicates() print("所有列去重后的 DataFrame:") print(psdf_distinct_all) # 2. 仅根据 'CUSTOMER_ID' 列进行去重,保留第一个 psdf_distinct_id_first = psdf.drop_duplicates(subset=["CUSTOMER_ID"], keep='first') print("根据 'CUSTOMER_ID' 列去重 (保留第一个) 后的 DataFrame:") print(psdf_distinct_id_first) # 3. 仅根据 'CUSTOMER_ID' 列进行去重,保留最后一个 psdf_distinct_id_last = psdf.drop_duplicates(subset=["CUSTOMER_ID"], keep='last') print("根据 'CUSTOMER_ID' 列去重 (保留最后一个) 后的 DataFrame:") print(psdf_distinct_id_last) # 4. 仅根据 'CUSTOMER_ID' 列进行去重,删除所有重复项 psdf_distinct_id_false = psdf.drop_duplicates(subset=["CUSTOMER_ID"], keep=False) print("根据 'CUSTOMER_ID' 列去重 (删除所有重复项) 后的 DataFrame:") print(psdf_distinct_id_false) # 停止SparkSession (如果需要,但通常在脚本结束时自动停止) spark.stop()
输出示例:
原始 PySpark Pandas DataFrame: CUSTOMER_ID NAME 0 C001 Alice 1 C002 Bob 2 C001 Alice 3 C003 Charlie 4 C002 Bob 所有列去重后的 DataFrame: CUSTOMER_ID NAME 0 C001 Alice 1 C002 Bob 3 C003 Charlie 根据 'CUSTOMER_ID' 列去重 (保留第一个) 后的 DataFrame: CUSTOMER_ID NAME 0 C001 Alice 1 C002 Bob 3 C003 Charlie 根据 'CUSTOMER_ID' 列去重 (保留最后一个) 后的 DataFrame: CUSTOMER_ID NAME 2 C001 Alice 4 C002 Bob 3 C003 Charlie 根据 'CUSTOMER_ID' 列去重 (删除所有重复项) 后的 DataFrame: CUSTOMER_ID NAME 3 C003 Charlie选择正确的去重方法:关键区别与注意事项
选择dropDuplicates()还是drop_duplicates()的核心在于你正在操作的DataFrame类型。
-
DataFrame类型识别:
- 如果你通过spark.createDataFrame()或读取Spark数据源(如Parquet、CSV)创建DataFrame,你得到的是pyspark.sql.DataFrame。此时应使用dropDuplicates()。
- 如果你通过pyspark.pandas.DataFrame()构造函数创建DataFrame,或者将pyspark.sql.DataFrame通过df.to_pandas_on_spark()(或旧版df.to_pandas())转换为pyspark.pandas.DataFrame,那么你应该使用drop_duplicates()。
你可以通过type(df)或df.__class__.__name__来检查DataFrame的类型。
-
API一致性:
- dropDuplicates()是Spark原生的API,其行为和性能优化是基于Spark分布式计算模型设计的。
- drop_duplicates()则遵循Pandas的API规范,对于熟悉Pandas的用户来说更直观。它在底层会转换为Spark操作,但其接口与Pandas保持高度一致。
-
功能差异:
- dropDuplicates()相对简洁,主要关注去重本身。当基于子集去重时,它保留哪个重复项是不确定的(通常是Spark内部优化决定的任意一个)。
- drop_duplicates()提供了keep参数,允许你精确控制保留第一个、最后一个还是删除所有重复项,这在某些业务场景下非常有用。
性能考量: 两种方法在底层都会触发Spark的distinct或groupBy操作,这通常涉及到数据的shuffle(混洗),对于大规模数据集而言,shuffle是计算密集型操作。因此,无论使用哪种方法,都应注意其对性能的影响。
PySpark提供了两种强大且高效的方法来处理DataFrame中的重复数据:pyspark.sql.DataFrame的dropDuplicates()和pyspark.pandas.DataFrame的drop_duplicates()。理解它们各自的适用场景和功能特性是编写高效PySpark代码的关键。在实践中,务必根据你当前操作的DataFrame类型来选择正确的去重函数。当需要更精细地控制重复项的保留策略时,pyspark.pandas.DataFrame.drop_duplicates()的keep参数提供了更大的灵活性。始终牢记,去重操作可能涉及数据混洗,因此在处理超大规模数据集时,应评估其性能影响。
以上就是PySpark中高效移除重复数据的两种策略的详细内容,更多请关注知识资源分享宝库其它相关文章!
发表评论:
◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。