
在处理包含数十万甚至数百万行数据的大型pandas dataframe时,直接对整个数据集执行复杂操作(如df.merge、df.apply)或频繁调用外部api(如google maps api),极易引发内存溢出、程序崩溃或因api请求频率过高而被限流等问题。尤其当每次api调用耗时且有严格的速率限制时,一次性处理所有数据几乎不可行。解决这些问题的关键在于采用分批处理(batch processing)策略。
2. 核心策略:数据分批处理分批处理的核心思想是将一个庞大的数据集分解成若干个大小可控的小数据集(批次),然后逐一处理这些批次。这种方法不仅能有效降低单次操作的内存消耗,还能更好地管理外部API的调用频率。
2.1 分批逻辑实现我们可以利用DataFrame的索引(df.index)结合整数除法(//)来为每一行分配一个批次编号。例如,如果希望每100行作为一个批次,那么df.index // 100就能生成相应的批次号。
以下是一个演示如何创建批次并迭代处理的示例代码:
import pandas as pd
import numpy as np
import time
import os
# 模拟一个大型DataFrame
# 实际应用中,这里会是您加载的50万行数据
data_size = 500000
df = pd.DataFrame({
'id': range(data_size),
'col_a': np.random.rand(data_size) * 100,
'address': [f'Address {i}, City {i % 100}' for i in range(data_size)],
'value_b': np.random.randint(0, 1000, data_size)
})
print(f"原始DataFrame大小: {len(df)} 行")
# 定义批次大小
batch_size = 100
# 为DataFrame中的每一行生成批次号
df['batch_num'] = df.index // batch_size
# 模拟一个外部API调用函数
def call_google_maps_api(address):
"""
模拟调用Google Maps API,获取经纬度
实际应用中,这里会是您的requests.get()调用
"""
# 模拟网络延迟和API处理时间
time.sleep(0.05) # 每次调用暂停50毫秒,以避免过快请求
if "City 0" in address: # 模拟某些地址可能失败
# raise ValueError(f"API Error for address: {address}")
return f"ERROR: {address}"
return f"Lat: {hash(address) % 90}, Lng: {hash(address) % 180}"
# 存储最终结果的列表
# 也可以直接写入CSV,下面会介绍两种方式
processed_batches = []
output_csv_path = 'processed_data_batched.csv'
# 如果输出文件已存在,先删除,确保从头开始
if os.path.exists(output_csv_path):
os.remove(output_csv_path)
print(f"已删除旧的输出文件: {output_csv_path}")
# 遍历所有唯一的批次号
unique_batches = df['batch_num'].unique()
for i, batch_id in enumerate(unique_batches):
print(f"正在处理批次 {i+1}/{len(unique_batches)} (批次号: {batch_id})...")
# 提取当前批次的DataFrame
# 使用 .copy() 避免SettingWithCopyWarning
current_batch_df = df[df['batch_num'] == batch_id].copy()
# --- 在此处对 current_batch_df 执行您的操作 ---
# 1. 模拟 df.merge 操作 (例如,与另一个小表合并)
# 假设有一个小的查找表
lookup_data = pd.DataFrame({
'id': range(data_size),
'category': [f'Cat_{i % 5}' for i in range(data_size)]
})
# 只合并当前批次所需的查找数据
current_batch_df = pd.merge(current_batch_df, lookup_data[['id', 'category']], on='id', how='left')
# 2. 模拟 df.apply 操作,其中包含外部API调用
# 针对 'address' 列调用模拟的Google Maps API
try:
current_batch_df['coordinates'] = current_batch_df['address'].apply(call_google_maps_api)
except Exception as e:
print(f"批次 {batch_id} API调用失败: {e}")
# 可以在这里实现重试逻辑或记录错误
current_batch_df['coordinates'] = "API_CALL_FAILED" # 标记失败
# 3. 其他数据转换或计算
current_batch_df['calculated_col'] = current_batch_df['col_a'] * 2
# --- 批次处理结束 ---
# 将处理后的批次数据添加到列表中
# processed_batches.append(current_batch_df)
# 替代方案:直接将批次结果写入CSV文件
# 对于第一个批次,写入头部;对于后续批次,不写入头部并以追加模式写入
if i == 0:
current_batch_df.to_csv(output_csv_path, mode='w', index=False, header=True, encoding='utf-8')
else:
current_batch_df.to_csv(output_csv_path, mode='a', index=False, header=False, encoding='utf-8')
# 释放内存 (可选,对于极大的DataFrame可能有用)
del current_batch_df
import gc
gc.collect()
print("\n所有批次处理完成!")
# 如果您选择将所有批次收集到列表中,最后再合并
# final_df = pd.concat(processed_batches, ignore_index=True)
# print(f"最终合并的DataFrame大小: {len(final_df)} 行")
# final_df.to_csv(output_csv_path, index=False, encoding='utf-8')
print(f"处理后的数据已保存到: {output_csv_path}")
final_df_check = pd.read_csv(output_csv_path)
print(f"从CSV读取的数据行数: {len(final_df_check)}") 2.2 处理流程与结果合并
在上述示例中,我们展示了两种处理批次结果的方式:
Post AI
博客文章AI生成器
50
查看详情
- 收集到列表再合并(注释掉的部分):将每个处理后的current_batch_df添加到processed_batches列表中。在所有批次处理完毕后,使用pd.concat(processed_batches, ignore_index=True)将所有批次合并成一个完整的DataFrame。这种方式适用于最终需要一个完整DataFrame进行后续操作的场景,但会占用更多内存。
-
直接追加写入CSV:这是处理大数据集更推荐的方法,尤其是在内存受限或最终目标是生成一个CSV文件时。
- 对于第一个批次(i == 0),使用mode='w'(写入模式)和header=True来创建文件并写入列头。
- 对于后续批次(i > 0),使用mode='a'(追加模式)和header=False来将数据追加到现有文件末尾,且不再写入列头。 这种方式能有效避免将所有处理结果同时加载到内存中,从而节省大量内存资源。
当分批处理涉及到外部API调用时,必须特别注意API的速率限制(Rate Limiting)和错误处理。
- 速率限制:大多数API都有每秒、每分钟或每天的请求次数限制。在上述示例中,我们通过time.sleep(0.05)模拟了每次API调用后的延迟,以控制请求频率。实际应用中,您可能需要根据API提供商的具体要求,设置更长的延迟或实现指数退避(Exponential Backoff)策略,即在API返回错误(如429 Too Many Requests)时,等待更长时间再重试。
- 错误处理:将API调用放在try-except块中,可以捕获网络错误、API响应错误等,并进行相应的处理,例如记录错误、跳过当前条目、使用默认值或实现重试机制。
- 缓存中间结果:对于耗时或高频调用的API,可以考虑在本地缓存API响应。如果同一地址被多次查询,可以直接从缓存中获取结果,减少实际的API请求。
- 批次大小的选择:没有一劳永逸的批次大小。它取决于您的系统内存、数据复杂性、API限制和处理逻辑的计算成本。通常建议从一个较小的批次(如1000或5000行)开始测试,逐步调整以找到最佳平衡点。
- 内存管理:在每次循环结束时,如果current_batch_df不再需要,可以显式使用del current_batch_df并调用gc.collect()来帮助Python的垃圾回收器释放内存。
- 进度保存与恢复:对于耗时数小时甚至数天的任务,考虑在每次批次处理完成后,记录已处理的批次号或将中间结果保存到临时文件。这样,如果程序意外中断,您可以从中断点恢复,而不是从头开始。
- 并行处理(高级):如果API调用是I/O密集型且可以并行执行,可以考虑使用Python的multiprocessing或concurrent.futures模块来并行处理多个批次。但这会增加代码复杂性,并需要更精细的API速率控制。
- 异常处理和日志记录:在实际生产环境中,为API调用和数据处理逻辑添加详细的异常处理和日志记录,有助于调试和监控程序的运行状态。
通过采用分批处理策略,我们能够有效地管理大型DataFrame的数据处理任务,避免因内存或API限制导致的程序崩溃。核心在于将数据分解为可管理的批次,并在每个批次内部执行所需的合并、计算和API调用。结合适当的API请求管理和错误处理机制,以及将结果增量写入文件,可以显著提升数据处理的稳定性和效率,确保即使面对海量数据和外部服务依赖,也能顺利完成任务。
以上就是Pandas大数据框分批处理与外部API调用优化实践的详细内容,更多请关注知识资源分享宝库其它相关文章!
相关标签: python go 大数据 app csv ai google 性能瓶颈 优化实践 api调用 csv文件 垃圾回收器 Python batch pandas try 循环 大家都在看: Python 实战:招聘网站数据分析案例 python中怎么进行类型转换_Python常见数据类型转换方法 Python解释器解析器中无限循环错误的诊断与修复 Python 实战:猜数字小游戏 Python Web Scraping技巧:处理同名类标签并精确筛选数据






发表评论:
◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。