
在处理包含数十万甚至数百万行数据的大型pandas dataframe时,直接执行全局操作(如df.merge()、df.apply())或对每一行进行外部api请求,常常会导致以下问题:
- 内存溢出(Memory Error):一次性加载并处理所有数据可能超出系统可用内存,导致程序崩溃。
- 程序崩溃与耗时过长:复杂的计算或大量的I/O操作(如文件读写、网络请求)可能使程序长时间运行甚至无响应,最终因资源耗尽而崩溃。
- API速率限制(API Rate Limiting):许多外部API(如Google Maps API)对短时间内的请求数量有严格限制。若不加控制地发送大量请求,会导致请求被拒绝,甚至IP被暂时封禁。
- 难以恢复:一旦程序崩溃,之前已完成的工作可能丢失,需要从头开始,效率低下。
分批处理(Batch Processing)是解决这些问题的有效策略,它将大型任务分解为更小、更易管理的子任务。
分批处理核心原理分批处理的核心思想是将一个庞大的DataFrame逻辑上或物理上拆分成多个较小的子DataFrame(即“批次”)。然后,对每个批次独立执行所需的操作(如合并、应用函数、API请求),并将每个批次的结果进行收集或即时保存。
这种方法的好处在于:
- 降低内存压力:每次只处理一部分数据,减少了瞬时内存占用。
- 规避API限制:可以在每个批次处理之间引入延迟,以满足API的速率限制要求。
- 提高稳定性与可恢复性:即使某个批次处理失败,也只会影响当前批次,并且可以从上一个成功批次的结果处恢复。
- 便于调试:可以在小批次上测试代码,确保逻辑正确后再应用于整个数据集。
下面将通过一个具体的Python Pandas示例,演示如何对大型DataFrame进行分批处理,并模拟merge、apply操作以及外部API请求。
Post AI
博客文章AI生成器
50
查看详情
import pandas as pd
from sklearn.datasets import load_diabetes # 用于生成示例数据
import time # 用于模拟API请求延迟
import os # 用于文件路径操作
# --- 1. 数据准备与模拟 ---
# 假设我们有一个大型DataFrame
# 这里使用sklearn的diabetes数据集模拟,实际中替换为你的数据
df_large = pd.DataFrame(load_diabetes().data, columns=load_diabetes().feature_names)
# 为了模拟合并操作,添加一个唯一ID列
df_large['record_id'] = range(len(df_large))
# 模拟另一个需要合并的DataFrame
df_other = pd.DataFrame({
'record_id': range(len(df_large)),
'additional_info': [f'info_for_record_{i}' for i in range(len(df_large))]
})
# --- 2. 定义分批大小 ---
batch_size = 100 # 每批处理100行数据
# --- 3. 为DataFrame添加批次号列 ---
# 使用整数除法 // 来为每行分配一个批次号
df_large['batch_num'] = df_large.index // batch_size
# --- 4. 存储结果的准备 ---
# 可以选择将每个批次的结果追加到CSV文件,或先收集到列表中再合并
output_csv_path = 'processed_data_batched.csv'
# 如果文件已存在,先删除,确保从新开始
if os.path.exists(output_csv_path):
os.remove(output_csv_path)
print(f"开始处理大型DataFrame,总行数: {len(df_large)},批次大小: {batch_size}")
print(f"预计总批次数: {df_large['batch_num'].nunique()}")
# --- 5. 遍历批次并执行操作 ---
# 使用groupby('batch_num')可以方便地迭代每个批次
for i, batch_df in df_large.groupby('batch_num'):
current_batch_number = i + 1
total_batches = df_large['batch_num'].nunique()
print(f"\n--- 正在处理批次 {current_batch_number}/{total_batches} (行索引 {batch_df.index.min()} 到 {batch_df.index.max()}) ---")
# --- 5.1 模拟 df.merge 操作 ---
# 假设我们需要将 df_other 中的信息合并到当前批次
# 注意:如果 df_other 也很大,可能需要对其进行预处理或优化查询
batch_df = pd.merge(batch_df, df_other[['record_id', 'additional_info']], on='record_id', how='left')
print(f"批次 {current_batch_number} 完成合并操作。")
# --- 5.2 模拟 df.apply 操作 ---
# 例如,对某一列进行复杂的数值转换或字符串处理
def complex_calculation(row_data):
# 实际中这里会是更复杂的业务逻辑
return row_data['bmi'] * row_data['s1'] / 100 + 5
batch_df['calculated_feature'] = batch_df.apply(complex_calculation, axis=1)
print(f"批次 {current_batch_number} 完成 apply 操作。")
# --- 5.3 模拟对外部API的请求 ---
# 假设你需要根据批次中的每一行数据调用一个外部API(如Google Maps)
def call_external_api(row_data):
# 实际中这里会是 requests.get('your_api_endpoint', params={'param': row_data['some_column']})
# 为了避免短时间内发送过多请求,这里引入延迟
time.sleep(0.05) # 模拟API请求延迟,并控制速率
return f"API_result_for_record_{row_data['record_id']}"
# 对批次中的每一行调用API
batch_df['api_response'] = batch_df.apply(call_external_api, axis=1)
print(f"批次 {current_batch_number} 完成 {len(batch_df)} 个API请求。")
# --- 5.4 保存当前批次结果 ---
# 将当前批次的处理结果追加到CSV文件
# 对于第一个批次,写入标题行;后续批次只追加数据
if i == 0:
batch_df.to_csv(output_csv_path, mode='w', index=False, header=True)
else:
batch_df.to_csv(output_csv_path, mode='a', index=False, header=False)
print(f"批次 {current_batch_number} 结果已保存到 {output_csv_path}")
print("\n所有批次处理完成。")
# --- 6. 最终验证(可选) ---
# 如果需要,可以重新加载整个处理后的文件进行最终检查
final_processed_df = pd.read_csv(output_csv_path)
print("\n最终处理后的数据预览:")
print(final_processed_df.head())
print(f"最终文件包含 {len(final_processed_df)} 行数据。") 代码详解:
- 数据准备:创建了一个模拟的大型DataFrame df_large 和一个用于合并的 df_other。
- 定义批次大小:batch_size = 100 设置了每个批次处理的行数。
- 添加批次号:df_large['batch_num'] = df_large.index // batch_size 是核心。它利用整数除法将DataFrame的索引按batch_size分组,为每行分配一个批次号。例如,索引0-99的行批次号为0,索引100-199的行批次号为1,以此类推。
- 结果存储:指定了一个CSV文件路径output_csv_path。在循环中,每个批次处理完后,其结果会被追加到这个文件中。
- 遍历批次:df_large.groupby('batch_num') 是一个非常方便的方式来迭代每个批次。i 是批次号,batch_df 是当前批次的DataFrame。
-
批次内操作:
- df.merge:在batch_df上执行合并操作。
- df.apply:在batch_df上执行自定义函数操作。
- API请求:定义了一个call_external_api函数来模拟API调用,并通过time.sleep(0.05)引入延迟,以避免触发API速率限制。
- 保存结果:batch_df.to_csv() 用于将当前批次的结果保存到CSV文件。mode='w' 用于第一个批次(写入新文件并包含表头),mode='a' 用于后续批次(追加到文件末尾且不包含表头),这样可以逐步构建完整的输出文件。
-
选择合适的批次大小:
- 太小:导致过多的文件I/O或循环迭代开销,效率可能不高。
- 太大:可能再次遇到内存或API限制问题。
- 建议:从一个适中的值(如1000-10000行)开始测试,根据系统资源、API限制和操作复杂性进行调整。对于API请求,批次大小可能需要更小,并且需要更长的延迟。
-
API请求管理:
- 延迟:time.sleep() 是最简单的延迟方式,但可能导致总处理时间过长。
- 指数退避(Exponential Backoff):当API返回错误(如429 Too Many Requests)时,逐步增加重试的延迟时间,直到成功或达到最大重试次数。
- 并发请求:对于某些API,可以使用多线程或异步IO(如asyncio配合aiohttp)在限制范围内并行发送请求,提高效率,但这会增加代码复杂度。
- API密钥管理:确保API密钥安全,不要硬编码在代码中。
-
中间结果保存:
- 将每个批次的结果追加到CSV文件是一个非常健壮的方法。即使程序崩溃,已处理的数据也已保存,下次可以从断点继续。
- 如果数据量不是极端大,也可以将所有批次的结果先收集到一个列表中,最后再用pd.concat()合并一次性保存。
-
错误处理:
- 在for循环内部使用try-except块捕获批次处理过程中可能发生的错误(如API请求失败、数据转换错误),并记录错误信息,避免程序中断。
- 对于API请求,检查HTTP响应状态码是至关重要的。
-
内存优化:
- 在处理完一个批次后,如果不再需要原始的batch_df,可以考虑使用del batch_df并调用gc.collect()来显式释放内存(尽管Python的垃圾回收机制通常会自动处理)。
- 选择合适的数据类型(如int32代替int64)也能减少内存占用。
- 进度反馈:在循环中打印当前处理的批次号、进度百分比等信息,让用户了解任务的执行状态。
通过将大型Pandas DataFrame操作和外部API请求分解为可管理的小批次,我们可以有效规避内存限制、API速率限制,并显著提高数据处理的鲁棒性和效率。本教程提供的分批处理策略和代码示例,为处理大数据量和集成外部服务提供了实用的解决方案。在实际应用中,根据具体场景调整批次大小、API请求策略和错误处理机制,将能够构建出更加稳定和高效的数据处理流程。
以上就是大型Pandas DataFrame分批处理策略与API请求优化的详细内容,更多请关注知识资源分享宝库其它相关文章!
相关标签: python go 编码 大数据 app csv ai google api调用 csv文件 内存占用 并发请求 Python batch pandas 数据类型 for try Error 循环 线程 多线程 并发 异步 http 大家都在看: Python怎么将时间戳转换为日期_Python时间戳与日期转换指南 Python 列表元素交换:len() 函数、负索引与Pythonic实践 Python怎么安装pip_Python包管理工具pip安装指南 python怎么将数据写入CSV文件_python CSV文件写入操作指南 交换列表中首尾元素的Python方法详解






发表评论:
◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。