
在处理包含数十万甚至数百万行数据的大型pandas dataframe时,直接对整个数据集执行复杂操作,如df.merge、df.apply,尤其是涉及外部api调用(例如google maps api)时,常常会导致程序崩溃、内存溢出或因api限流而耗时过长。为了解决这些问题,采用分批处理(batch processing)是一种高效且稳健的策略。本文将详细介绍如何将大型dataframe分批处理,并优化外部api调用,实现数据的高效与稳定处理。
一、挑战:大型DataFrame与外部API调用大型DataFrame在内存中占用大量资源,一次性加载和处理可能超出系统内存限制。同时,对每一行或每一组数据发起独立的外部API请求,会面临以下问题:
- API速率限制(Rate Limiting):大多数公共API都有每秒、每分钟或每天的请求次数限制。短时间内大量请求会导致API拒绝服务。
- 网络延迟与开销:每次API调用都涉及网络通信,累积的延迟会显著增加总处理时间。
- 程序稳定性:长时间运行的复杂操作更容易因内存不足、网络瞬断或其他未知错误而崩溃,且难以恢复。
分批处理的核心思想是将大型DataFrame分解成若干个较小的、可管理的子集(批次),然后对每个批次独立进行处理。这种方法带来了多重优势:
- 内存优化:每次只加载和处理一个批次的数据,显著降低内存占用。
- API限流管理:可以在处理每个批次之间引入延迟,以遵守API的速率限制。
- 提高容错性:如果某个批次处理失败,可以更容易地识别问题并重新处理该批次,而不是从头开始。
- 增量写入:处理完一个批次后,可以立即将结果写入文件(如CSV),即使程序中断,已处理的数据也不会丢失。
我们将通过一个模拟场景来演示如何分批处理大型DataFrame,其中包含模拟的apply操作和外部API调用,并将结果增量写入CSV文件。
1. 创建模拟数据首先,我们创建一个大型的模拟DataFrame,包含一个需要通过API获取信息的“地址”列。
import pandas as pd
import numpy as np
import time
import os
# 创建一个大型模拟DataFrame
data_size = 500000 # 50万行数据
df = pd.DataFrame({
'id': range(data_size),
'value1': np.random.rand(data_size) * 100,
'value2': np.random.randint(1, 1000, data_size),
'address': [f"模拟地址 {i}, 城市A, 国家B" for i in range(data_size)] # 模拟地址信息
})
print(f"原始DataFrame大小: {len(df)} 行") 2. 定义批次大小并标记批次
确定一个合适的批次大小(例如100行或1000行),然后为DataFrame中的每一行分配一个批次编号。
Post AI
博客文章AI生成器
50
查看详情
batch_size = 1000 # 每批处理1000行
df['batch_num'] = df.index // batch_size
# 打印批次信息
print(f"数据将被分割成 {df['batch_num'].nunique()} 个批次,每批 {batch_size} 行。") 3. 模拟外部API调用与数据处理函数
定义一个函数来模拟外部API调用,并引入延迟以模拟网络请求和API限流。同时,定义一个函数来处理每个批次的数据,包括apply操作和API调用。
# 模拟外部API调用函数
def get_coordinates_from_address(address):
"""
模拟一个外部API调用,根据地址获取经纬度。
在实际应用中,这里会是调用Google Maps API等。
"""
time.sleep(0.01) # 模拟API请求的延迟,例如10毫秒
# 模拟返回经纬度数据
lat = np.random.uniform(30, 40)
lon = np.random.uniform(-100, -90)
return f"Lat:{lat:.4f}, Lon:{lon:.4f}"
# 定义批次数据处理函数
def process_data_chunk(chunk_df):
"""
对单个数据批次执行复杂的apply操作和API调用。
"""
# 示例1: 执行一个复杂的apply操作
chunk_df['processed_value'] = chunk_df['value1'] * 0.5 + chunk_df['value2'] / 10
# 示例2: 对地址列进行API调用
# 注意:如果API支持批量查询,应优先使用批量查询以减少网络开销
# 这里为了演示,我们假设API是按行调用的
chunk_df['coordinates'] = chunk_df['address'].apply(get_coordinates_from_address)
# 示例3: 模拟一个merge操作 (如果需要与其他DataFrame合并)
# 假设有一个小型配置DataFrame需要合并
# config_df = pd.DataFrame({'id': [0, 1, 2], 'config_info': ['A', 'B', 'C']})
# chunk_df = pd.merge(chunk_df, config_df, on='id', how='left')
return chunk_df 4. 迭代批次并增量写入
现在,我们可以遍历所有批次,对每个批次进行处理,并将结果增量写入同一个CSV文件。
output_csv_path = 'processed_large_dataframe.csv'
# 如果输出文件已存在,先删除,确保从头开始写入
if os.path.exists(output_csv_path):
os.remove(output_csv_path)
print(f"已删除旧的输出文件: {output_csv_path}")
header_written = False # 标记是否已写入CSV头部
print(f"\n开始分批处理 {len(df)} 行数据并写入 {output_csv_path}...")
unique_batches = df['batch_num'].unique()
total_batches = len(unique_batches)
for i, batch_id in enumerate(unique_batches):
# 提取当前批次的数据
current_batch_df = df[df['batch_num'] == batch_id].copy() # 使用 .copy() 避免 SettingWithCopyWarning
print(f"正在处理批次 {i+1}/{total_batches} (行范围: {current_batch_df.index.min()} - {current_batch_df.index.max()})")
# 处理当前批次的数据
processed_batch = process_data_chunk(current_batch_df)
# 将处理后的批次数据写入CSV文件
if not header_written:
# 首次写入,包含头部
processed_batch.to_csv(output_csv_path, mode='w', index=False, encoding='utf-8')
header_written = True
else:
# 后续写入,不包含头部,以追加模式写入
processed_batch.to_csv(output_csv_path, mode='a', header=False, index=False, encoding='utf-8')
# 可选:在批次之间引入额外的延迟,以更严格地遵守API速率限制
# time.sleep(0.5) # 例如,每处理完一个批次暂停0.5秒
print(f"\n所有批次处理完成,结果已写入 {output_csv_path}")
# 验证写入结果 (可选)
# processed_df = pd.read_csv(output_csv_path)
# print(f"\n从CSV读取的数据总行数: {len(processed_df)}")
# print("前5行数据示例:")
# print(processed_df.head()) 四、注意事项与优化建议
- 选择合适的批次大小:批次大小的选择取决于您的系统内存、API限流策略以及操作的复杂性。过小的批次会增加迭代和文件I/O的开销;过大的批次则可能再次引入内存或API问题。建议通过实验找到最佳平衡点。
-
API限流与错误处理:
- 延迟:在批次处理之间添加 time.sleep() 是最直接的限流方式。
- 重试机制:对于外部API调用,应实现健壮的重试逻辑,例如使用 tenacity 库,在API返回429(Too Many Requests)或5xx错误时自动重试。
- 批量API请求:如果API支持,优先使用批量请求接口,一次性发送多个数据点的请求,这能显著减少网络往返次数和总延迟。
-
增量写入的考量:
- 文件模式:mode='w' 用于首次写入(创建文件并写入头部),mode='a' 用于后续追加(不写入头部)。
- 索引:index=False 避免将DataFrame的索引作为一列写入CSV。
- 编码:指定 encoding='utf-8' 以避免字符编码问题。
- 并行处理:对于CPU密集型或大量API请求的场景,可以考虑使用 multiprocessing 模块将批次处理任务分配给多个CPU核心或进程并行执行。但这会增加代码复杂性,且需要更精细的API限流管理。
- 内存管理:在处理完一个批次后,如果不再需要原始批次数据,可以考虑使用 del 语句释放内存,或者确保变量超出作用域后被垃圾回收。
- 中间结果保存:如果处理流程非常漫长,可以考虑在每个批次处理后,不仅写入最终结果,还保存一些关键的中间状态,以便在程序崩溃后能从最近的检查点恢复。
通过将大型Pandas DataFrame分解为可管理的小批次进行处理,我们能够有效地规避内存限制、遵守API速率,并提高数据处理的整体稳定性和效率。这种分批处理结合增量写入的策略,是处理海量数据和外部服务交互时的最佳实践之一,尤其适用于那些需要长时间运行且对资源消耗敏感的数据管道。遵循本文提供的指南和代码示例,您可以构建出更加健壮和高效的数据处理解决方案。
以上就是高效处理大型DataFrame:Pandas分批操作与外部API请求管理的详细内容,更多请关注知识资源分享宝库其它相关文章!
相关标签: go 编码 app csv google 作用域 api调用 csv文件 内存占用 batch pandas 接口 作用域 大家都在看: 将字节流转换为 Go 语言中的 float32 数组 Go 语言:从字节数据高效还原 float32 数组的实践指南 Go语言中高效转换字节序列为Float32数组的指南 Go语言中将字节流转换为Float32数组的实用指南 Go语言中字节切片高效转换为Float32浮点数数组的指南






发表评论:
◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。