在数据分析中,我们经常需要处理两个或多个时间序列数据集,并根据特定的时间窗口进行关联。例如,我们可能有一个记录用户交易(trade)的dataframe,以及另一个记录用户浏览历史(view)的dataframe。我们的目标是为每笔交易找出其发生前7天内的所有相关浏览记录,并将其聚合到交易记录中。这里的“相关”不仅指时间上的接近,还包括了用户(person)和商品代码(code)等共同标识符的匹配。
传统的pd.merge_asof函数常用于近似合并,但它通常只为每个源行找到一个最近的匹配项,或者在指定容忍度内进行匹配,但其设计并非为了收集一个源行对应的所有可能匹配项。例如,如果一个交易对应多个浏览记录,merge_asof可能无法将所有这些记录都关联起来,因为它倾向于“消费”匹配到的行。因此,我们需要一种更灵活的方法来实现这种多对多的时间窗口内关联。
数据准备首先,我们创建两个示例DataFrame来模拟交易数据和浏览历史数据:
import pandas as pd import janitor # 稍后会用到 # 交易数据 trade = pd.DataFrame({ 'date': ['2019-08-31', '2019-09-01', '2019-09-04'], 'person': [1, 1, 2], 'code': [123, 123, 456], 'value1': [1, 2, 3] }) # 浏览历史数据 view = pd.DataFrame({ 'date': ['2019-08-29', '2019-08-29', '2019-08-30', '2019-08-31', '2019-09-01', '2019-09-01', '2019-09-01', '2019-09-02', '2019-09-03'], 'person': [1, 1, 1, 2, 1, 2, 2, 1, 2], 'code': [123, 456, 123, 456, 123, 123, 456, 123, 456], 'value': [1, 2, 3, 4, 5, 6, 7, 8, 9] }) # 将日期列转换为datetime对象,这是时间序列操作的基础 trade['date'] = pd.to_datetime(trade['date']) view['date'] = pd.to_datetime(view['date']) print("交易数据 (trade DataFrame):") print(trade) print("\n浏览历史数据 (view DataFrame):") print(view)解决方案一:使用 pyjanitor.conditional_join (推荐)
pyjanitor库提供了一个强大的conditional_join函数,专门用于执行基于多个条件的非等值连接。它在处理此类时间窗口关联问题时,通常比纯Pandas方法更高效。
实现步骤- 创建时间窗口辅助列: 为trade DataFrame添加一个start_date列,表示每笔交易发生日期前7天的起始日期。
- 重命名 view DataFrame 列: 为了避免合并后的列名冲突,并使输出结果更清晰,我们预先重命名view DataFrame中的date和value列。
-
执行条件连接: 使用conditional_join函数,指定以下连接条件:
- trade.start_date <= view.view_dates (浏览记录日期不早于交易前7天)
- trade.date >= view.view_dates (浏览记录日期不晚于交易日期)
- trade.person == view.person (用户ID匹配)
- trade.code == view.code (商品代码匹配)
- 清理与格式化: 删除不再需要的辅助列start_date,并将view_dates列格式化为字符串。
- 聚合结果: 根据trade DataFrame的原始列进行分组,并将匹配到的view_dates和view_values聚合成列表。
out_janitor = (trade .assign(start_date=lambda d: d['date'].sub(pd.DateOffset(days=7))) # 步骤1 .conditional_join(view.rename(columns={'date': 'view_dates', 'value': 'view_values'}), # 步骤2 ('start_date', 'view_dates', '<='), # 步骤3: 条件1 ('date', 'view_dates', '>='), # 步骤3: 条件2 ('person', 'person', '=='), # 步骤3: 条件3 ('code', 'code', '=='), # 步骤3: 条件4 right_columns=['view_dates', 'view_values'] # 保留右侧特定列 ) .drop(columns='start_date') # 步骤4: 删除辅助列 .assign(view_dates=lambda d: d['view_dates'].dt.strftime('%Y-%m-%d')) # 步骤4: 格式化日期 .groupby(list(trade.columns), as_index=False).agg(list) # 步骤5: 分组聚合 ) print("\n使用 pyjanitor.conditional_join 的结果:") print(out_janitor)注意事项
- pyjanitor是一个第三方库,需要通过pip install pyjanitor安装。
- conditional_join在处理大型数据集和复杂连接条件时,通常比纯Pandas的merge后筛选更高效,因为它在内部可能使用了更优化的算法。
如果不想引入额外的库,也可以纯粹使用Pandas的merge和筛选操作来达到相同的效果。这种方法虽然直观,但在处理大型数据集时,可能会因为生成一个非常大的中间DataFrame而导致性能问题。
实现步骤- 执行全量合并: 首先,基于共同的person和code列,对trade和view DataFrame执行一个内连接(merge)。这将生成所有可能的person和code组合的交易与浏览记录。
-
筛选时间窗口: 在合并后的DataFrame上,应用时间窗口筛选条件:
- trade.date > view.view_dates (浏览记录必须发生在交易之前)
- trade.date - 7天 <= view.view_dates (浏览记录必须发生在交易前7天内)
- 格式化与聚合: 将view_dates列格式化为字符串,然后按照trade DataFrame的原始列进行分组,并将匹配到的view_dates和view_values聚合成列表。
out_pandas = (trade .merge(view.rename(columns={'date': 'view_dates', 'value': 'view_values'}), # 步骤1: 全量合并并重命名 on=['person', 'code']) .loc[lambda d: d['date'].gt(d['view_dates']) & # 步骤2: 筛选条件1 (浏览在交易之前) d['date'].sub(pd.DateOffset(days=7)).le(d['view_dates']) # 步骤2: 筛选条件2 (浏览在交易前7天内) ] .assign(view_dates=lambda d: d['view_dates'].dt.strftime('%Y-%m-%d')) # 步骤3: 格式化日期 .groupby(list(trade.columns), as_index=False).agg(list) # 步骤3: 分组聚合 ) print("\n使用纯 Pandas 实现的结果:") print(out_pandas)注意事项
- 这种方法在merge阶段会生成一个包含所有person和code组合的笛卡尔积(如果on条件不唯一),然后才进行时间筛选。如果person和code组合很多,或者每个组合下的交易和浏览记录都很多,这个中间DataFrame可能会非常庞大,占用大量内存并降低性能。
- 对于数据量较小或中等的情况,这种方法是完全可行的,并且不需要额外的库依赖。
两种方法都成功生成了预期的输出,为每笔交易关联了其发生前7天内的所有相关浏览记录,并将这些记录的日期和值聚合为列表:
date person code value1 view_dates view_values 0 2019-08-31 1 123 1 [2019-08-29, 2019-08-30] [1, 3] 1 2019-09-01 1 123 2 [2019-08-29, 2019-08-30, 2019-09-01] [1, 3, 5] 2 2019-09-04 2 456 3 [2019-08-31, 2019-09-01, 2019-09-03] [4, 7, 9]
可以看到,对于第一笔交易(2019-08-31, person 1, code 123),关联到了2019-08-29和2019-08-30的浏览记录。对于第二笔交易(2019-09-01, person 1, code 123),关联到了2019-08-29、2019-08-30和2019-09-01的浏览记录,这正是merge_asof无法直接实现的多对多关联需求。
总结本教程介绍了两种在Pandas中处理时间序列数据,实现特定时间窗口内多对多关联和聚合的方法:
- pyjanitor.conditional_join: 适用于需要进行复杂非等值连接的场景,尤其是在处理大型数据集时,其性能通常更优。它能够直接在多个条件(包括范围条件)下进行连接,避免了生成巨大的中间DataFrame。
- 纯 Pandas merge + 筛选: 适用于数据量较小或中等的场景,不需要额外库依赖。但其缺点在于可能生成一个非常大的中间DataFrame,从而影响性能和内存使用。
在实际应用中,建议优先考虑pyjanitor.conditional_join,特别是在处理大规模数据时,以获得更好的性能和更简洁的代码。如果项目严格限制外部依赖,且数据规模可控,纯Pandas方案也是一个可行的选择。无论选择哪种方法,将日期列正确转换为datetime对象是进行时间序列操作的关键前提。
以上就是高效处理Pandas时间序列数据:7天内事件关联与聚合的详细内容,更多请关注知识资源分享宝库其它相关文章!
发表评论:
◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。