在数据分析场景中,我们经常需要将两个或多个数据集进行关联。当关联条件不仅包括精确匹配的键,还涉及时间范围,并且要求收集时间窗口内所有符合条件的记录时,传统的合并操作(如merge或merge_asof)可能无法直接满足需求。例如,我们可能需要找出每笔交易发生前7天内的所有浏览记录。本文将通过一个具体案例,详细介绍两种在pandas中实现此类复杂数据关联与聚合的方法。
问题描述与数据准备假设我们有两个DataFrame:trade记录了交易信息,view记录了用户的浏览历史。我们需要为每一笔交易,找出其发生前7天内,由同一用户(person)对同一商品(code)进行的所有浏览记录。
首先,我们创建示例数据并进行初步的数据类型转换,确保日期列为datetime类型,这对于时间相关的计算至关重要。
import pandas as pd import janitor # 导入pyjanitor库 # 交易数据 trade = pd.DataFrame({ 'date': ['2019-08-31', '2019-09-01', '2019-09-04'], 'person': [1, 1, 2], 'code': [123, 123, 456], 'value1': [1, 2, 3] }) # 浏览历史数据 view = pd.DataFrame({ 'date': ['2019-08-29', '2019-08-29', '2019-08-30', '2019-08-31', '2019-09-01', '2019-09-01', '2019-09-01', '2019-09-02', '2019-09-03'], 'person': [1, 1, 1, 2, 1, 2, 2, 1, 2], 'code': [123, 456, 123, 456, 123, 123, 456, 123, 456], 'value': [1, 2, 3, 4, 5, 6, 7, 8, 9] }) # 将日期列转换为datetime类型 trade['date'] = pd.to_datetime(trade['date']) view['date'] = pd.to_datetime(view['date']) print("交易数据 (trade DataFrame):") print(trade) print("\n浏览历史数据 (view DataFrame):") print(view)
期望结果: 对于每一笔交易,我们希望得到一个包含其交易日期、人员、商品代码、交易值,以及一个列表,列出该交易前7天内所有相关浏览记录的日期和对应的浏览值。
pd.merge_asof虽然可以处理时间序列的近似合并,但它的设计是为每个左侧记录找到一个最近的右侧记录,而不是收集时间窗口内的所有记录。因此,我们需要更灵活的方法。
解决方案一:使用 pyjanitor.conditional_join (推荐)pyjanitor是一个增强Pandas功能的库,其conditional_join函数特别适用于处理涉及多个条件(包括范围条件)的复杂合并。它能够高效地执行非等值连接,并且可以保留所有符合条件的匹配项。
实现步骤:- 创建辅助列 start_date: 对于trade DataFrame中的每笔交易,计算其发生日期减去7天作为时间窗口的起始日期。
- 重命名 view DataFrame的列: 为了在合并后区分来源,我们将view DataFrame中的date和value列重命名为view_dates和view_values。
-
执行 conditional_join:
- 指定左侧DataFrame (trade) 和右侧DataFrame (view的重命名版本)。
- 定义合并条件:
- ('start_date', 'view_dates', '<='): 浏览日期必须大于或等于交易的起始日期。
- ('date', 'view_dates', '>='): 浏览日期必须小于或等于交易的结束日期(即交易日期)。
- ('person', 'person', '=='): 用户ID必须匹配。
- ('code', 'code', '=='): 商品代码必须匹配。
- right_columns参数用于指定从右侧DataFrame中保留的列。
-
后处理与聚合:
- 删除不再需要的start_date辅助列。
- 将view_dates转换回字符串格式(如果需要与期望输出一致)。
- 使用groupby结合agg(list)将每个交易的所有匹配浏览记录聚合为列表。
# 解决方案一:使用pyjanitor.conditional_join out_janitor = ( trade .assign(start_date=lambda d: d['date'].sub(pd.DateOffset(days=7))) # 1. 创建辅助列 .conditional_join( view.rename(columns={'date': 'view_dates', 'value': 'view_values'}), # 2. 重命名列 ('start_date', 'view_dates', '<='), # 条件1: 浏览日期 >= 交易开始日期 ('date', 'view_dates', '>='), # 条件2: 浏览日期 <= 交易日期 ('person', 'person', '=='), # 条件3: 人员匹配 ('code', 'code', '=='), # 条件4: 商品代码匹配 right_columns=['view_dates', 'view_values'] # 保留右侧的这些列 ) .drop(columns='start_date') # 删除辅助列 .assign(view_dates=lambda d: d['view_dates'].dt.strftime('%Y-%m-%d')) # 格式化日期 .groupby(list(trade), as_index=False).agg(list) # 按原始trade列分组并聚合为列表 ) print("\n解决方案一 (pyjanitor.conditional_join) 结果:") print(out_janitor)
结果分析:pyjanitor.conditional_join能够高效地处理多个非等值条件,并且会返回所有符合条件的匹配项。最终通过groupby.agg(list)将这些匹配项聚合到每个原始交易记录下,生成了我们期望的列表结构。
解决方案二:纯 Pandas 实现如果不想引入额外的库,也可以通过纯Pandas操作实现,但对于大数据集而言,其效率可能不如conditional_join。这种方法的核心思想是先进行一次宽泛的合并,然后进行严格的筛选。
实现步骤:- 宽泛合并: 使用merge函数基于共同的person和code列进行合并。这将产生一个包含所有可能组合的中间DataFrame。
- 时间条件筛选: 在合并结果上应用时间窗口条件进行筛选。即,view_dates必须在trade的date之前,且在trade的date减去7天之后。
- 后处理与聚合: 与pyjanitor方案类似,对view_dates进行格式化,然后使用groupby结合agg(list)进行聚合。
# 解决方案二:纯Pandas实现 out_pandas = ( trade .merge(view.rename(columns={'date': 'view_dates', 'value': 'view_values'}), # 1. 宽泛合并 on=['person', 'code']) .loc[lambda d: d['date'].gt(d['view_dates']) & # 2. 时间条件筛选: 交易日期 > 浏览日期 d['date'].sub(pd.DateOffset(days=7)).le(d['view_dates']) # 交易日期-7天 <= 浏览日期 ] .assign(view_dates=lambda d: d['view_dates'].dt.strftime('%Y-%m-%d')) # 格式化日期 .groupby(list(trade), as_index=False).agg(list) # 按原始trade列分组并聚合为列表 ) print("\n解决方案二 (纯Pandas) 结果:") print(out_pandas)
结果分析: 纯Pandas方案通过先生成一个较大的中间结果(所有person和code匹配的组合),然后进行两次时间条件的过滤。虽然也能得到正确结果,但中间DataFrame的大小可能会显著增加内存消耗和计算时间,尤其是在原始数据量较大时。
总结与注意事项本文介绍了两种在Pandas中实现基于多条件和时间窗口聚合数据的有效方法:
- pyjanitor.conditional_join: 推荐用于处理复杂、非等值连接条件的场景。它能够更高效地处理时间范围匹配,避免生成过大的中间DataFrame,尤其适用于大型数据集。
- 纯Pandas方案: 适用于对性能要求不那么极致,或数据集规模相对较小的场景。其优点是不依赖第三方库,但可能在处理大数据时面临性能瓶颈。
关键注意事项:
- 日期类型转换: 始终确保涉及时间计算的列是Pandas的datetime类型,这是进行日期算术的基础。
- 时间偏移量: 使用pd.DateOffset可以方便地进行日期加减操作,例如pd.DateOffset(days=7)。
- groupby().agg(list): 这是将多个匹配项聚合到列表中的关键步骤。list(trade)作为groupby的参数,确保了按照原始trade DataFrame的所有列进行分组,从而保持了交易记录的唯一性。
- 性能考量: 对于大规模数据集,pyjanitor.conditional_join通常会提供更好的性能,因为它在底层优化了非等值连接的执行。纯Pandas方案的merge操作可能会导致笛卡尔积的子集,从而消耗更多内存和计算资源。
选择哪种方案取决于具体的需求、数据集大小以及对外部库的接受程度。在追求效率和简洁性时,pyjanitor提供了一个强大的工具。
以上就是Pandas中基于多条件和时间窗口匹配并聚合多条记录的详细内容,更多请关注知识资源分享宝库其它相关文章!
发表评论:
◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。