
openai api为了确保服务的公平性和稳定性,对不同账户和模型设定了严格的速率限制(rate limits)。这些限制通常以每分钟请求数(rpm - requests per minute)和每分钟令牌数(tpm - tokens per minute)的形式体现。当您的应用程序在短时间内发出的请求超过了这些预设的限制时,api会返回rate_limit_exceeded错误。
对于许多开发者而言,理解哪些操作会被计入速率限制至关重要。例如,一个常见的误解是,只有显式地创建消息或运行(client.beta.threads.messages.create、client.beta.threads.runs.create)才会计入请求数。然而,实际上,许多辅助性操作,如轮询Run的状态,同样会消耗您的请求配额。
Run状态轮询:隐藏的速率消耗者在使用OpenAI Assistants API时,一个典型的流程是:
- 创建文件(client.files.create)
- 创建消息(client.beta.threads.messages.create)
- 创建运行(client.beta.threads.runs.create)
- 轮询运行状态直到完成(client.beta.threads.runs.retrieve)
- 获取结果(client.beta.threads.messages.list)
问题通常出现在第四步:轮询运行状态。为了确定助手是否已完成其任务,我们需要反复调用client.beta.threads.runs.retrieve来检查run.status。这个retrieve操作本身就是一次API请求,它会实时计入您的速率限制。
考虑以下场景:如果您的速率限制是每分钟3个请求(3 RPM),并且您在每次主请求之间加入了20秒的time.sleep(20)。这看起来足以将主请求频率控制在3 RPM以内。然而,如果在每次主请求的内部,您又在一个while循环中频繁地调用client.beta.threads.runs.retrieve来检查状态,那么这些内部的retrieve调用会迅速耗尽您的请求配额。
例如,一个Run可能需要几秒钟甚至更长时间才能完成。在这期间,while循环可能会每秒钟执行一次retrieve调用。即使一个Run只持续10秒,也可能产生10次额外的API请求。如果您的主请求间隔是20秒,而内部轮询在短时间内产生了大量请求,总请求数很容易超过每分钟3次的限制。这就是为什么有时程序会在处理第一个文件时失败,有时在第三个文件时失败,因为失败的时机取决于Run的实际处理时长以及内部轮询的频率。
优化速率限制管理策略为了有效管理和规避这种类型的速率限制问题,核心在于控制所有API调用的频率,而不仅仅是主操作。
1. 在轮询循环中引入策略性延迟最直接的解决方案是在while循环内部,每次调用client.beta.threads.runs.retrieve之后,也加入一个适当的延迟。这个延迟应该足够长,以确保即使在最坏情况下,内部轮询和外部主请求的总频率也不会超过速率限制。
Teleporthq
一体化AI网站生成器,能够快速设计和部署静态网站
182
查看详情
假设您的速率限制是3 RPM,这意味着平均每20秒才能发出一个请求。如果一个Run平均需要10秒完成,并且您希望在这10秒内只进行少量状态检查,那么每次轮询之间可以设置一个较长的延迟。
示例代码修改:
import pandas as pd
import time
from openai import OpenAI
client = OpenAI(api_key = "[MY API KEY]")
# 建议为每个文件创建一个新的线程,以避免线程内容积累和混淆
# thread = client.beta.threads.create() # 移到循环内部
assistant = client.beta.assistants.create(
name = "Nomination Hearing Identifier",
instructions = "Given a complete transcript of a US Senate hearing, determine if this hearing was or was not a nomination hearing. Respond with only 'YES' or 'NO' and do not provide justification.",
tools = [{"type": "retrieval"}],
model = "gpt-3.5-turbo-1106"
)
files = ["CHRG-108shrg1910401.txt","CHRG-108shrg1910403.txt", "CHRG-108shrg1910406.txt", "CHRG-108shrg1910407.txt", "CHRG-108shrg1910408.txt", "CHRG-108shrg1910409.txt", "CHRG-108shrg1910410.txt", "CHRG-108shrg1910411.txt", "CHRG-108shrg1910413.txt", "CHRG-108shrg1910414.txt"]
jacket_classifications = pd.DataFrame(columns = ["jacket", "is_nomination"])
for file in files:
# 为每个文件创建一个新的线程,确保隔离性
thread = client.beta.threads.create()
gpt_file = client.files.create(
file = open(file, "rb"),
purpose = 'assistants'
)
message = client.beta.threads.messages.create(
thread_id=thread.id,
role="user",
content="Determine if the transcript in this file does or does not describe a nomination hearing. Respond with only 'YES' or 'NO' and do not provide justification.",
file_ids=[gpt_file.id]
)
run = client.beta.threads.runs.create(
thread_id=thread.id,
assistant_id=assistant.id,
)
# 在这里引入一个更长的初始等待,以避免立即开始频繁轮询
print(f"Waiting for run {run.id} to complete for file {file}...")
# time.sleep(5) # 可以在这里加一个初始等待,但更重要的是循环内的等待
while run.status != "completed":
# 每次轮询前都进行等待,确保retrieve调用频率受控
# 假设每次retrieve调用需要至少20秒的间隔来满足3 RPM的限制
# 如果Run本身很快,可以适当缩短,但要保守估计
print(f"Run status: {run.status}. Sleeping for 10 seconds before next check.")
time.sleep(10) # 关键:在每次retrieve调用前等待
run = client.beta.threads.runs.retrieve(
thread_id=thread.id,
run_id=run.id
)
if run.status == "failed":
print(f"Run failed for file {file}: {run.last_error}")
# 可以在这里添加重试逻辑或跳过当前文件
break # 跳出当前文件的轮询循环
if run.status == "completed":
messages = client.beta.threads.messages.list(
thread_id=thread.id
)
output = messages.data[0].content[0].text.value
is_nomination = 0 # 默认值
if "yes" in output.lower(): # 统一转换为小写进行判断
is_nomination = 1
row = pd.DataFrame({"jacket":[file], "is_nomination":[is_nomination]})
jacket_classifications = pd.concat([jacket_classifications, row], ignore_index=True) # 使用ignore_index=True
print(f"Processed file {file}. Result: {output}")
else:
print(f"Skipping file {file} due to failed run.")
# 外部循环的延迟可以根据整体请求频率和模型处理速度调整
# 如果内部轮询已经有了足够的延迟,这里可以根据需要调整
print("Sleeping 20 seconds before processing next file to ensure overall API call rate limit not surpassed.")
time.sleep(20) # 确保下一个文件的初始请求不会立即触发速率限制
jacket_classifications.to_csv("[MY FILE PATH]/test.csv", index=False) # index=False避免写入额外索引列
print("Processing complete. Results saved to CSV.") 代码改进说明:
- 内部轮询延迟: 在while run.status != "completed"循环内部,每次调用client.beta.threads.runs.retrieve之前添加time.sleep(10)。这个值需要根据您的具体速率限制和Run的平均完成时间进行调整。目标是确保retrieve调用的频率低于速率限制。
- 线程管理: 将thread = client.beta.threads.create()移到for循环内部。虽然原始问题不直接与此相关,但在处理多个独立文件时,为每个文件创建新线程是更好的实践,可以避免上下文混淆和潜在的令牌使用问题。
- 错误处理: 增加了对run.status == "failed"的检查,以便在Run失败时能够优雅地处理。
- 字符串比较: if "yes" in output.lower(): 使得判断不区分大小写,更健壮。
- DataFrame拼接: pd.concat时使用ignore_index=True是一个好的实践,可以避免索引重复。
- CSV保存: index=False可以避免将DataFrame的索引写入CSV文件。
对于更健壮的生产系统,建议使用指数退避策略来处理速率限制。当API返回速率限制错误时,不是立即重试,而是等待一个逐渐增长的时间间隔后再重试。许多Python库(如tenacity)都提供了开箱即用的指数退避功能。
3. 监控API使用情况定期查看OpenAI平台上的账户使用情况和速率限制仪表板(https://www.php.cn/link/2d00ce98adf1abcedcf3cecb0859343a。
总结与最佳实践- 理解所有API调用: 明确知道哪些操作会计入您的API请求配额,即使是看似辅助性的操作(如状态轮询)。
- 策略性延迟: 在所有可能导致高频率API调用的循环中,尤其是轮询操作,引入适当的time.sleep延迟。
- 指数退避: 在生产环境中,结合指数退避机制来处理临时的速率限制错误,提高应用程序的韧性。
- 监控与调整: 定期检查您的API使用情况,并根据OpenAI的速率限制政策和您的实际需求调整代码中的延迟参数。
- 优化请求设计: 尽量减少不必要的API调用。例如,如果可能,考虑批量处理数据以减少API请求次数,但这在Assistants API的Run模式下可能不直接适用。
通过上述策略,您可以更有效地管理OpenAI API的速率限制,确保您的应用程序在扩展时能够稳定、可靠地运行。
以上就是OpenAI API速率限制管理:理解并优化Run状态轮询机制的详细内容,更多请关注知识资源分享宝库其它相关文章!
相关标签: python csv ai openai gpt 常见问题 api调用 csv文件 为什么 Python if for while 字符串 循环 线程 Thread https 大家都在看: Python单元测试:正确模拟类方法内部条件调用 深入理解Python zip对象:一次性遍历的特性与数据复用策略 Python zip 对象:理解其迭代器特性与多次遍历策略 Python 实战:二手车价格分析项目 Python单元测试:正确Mock类方法中条件分支的内部函数调用






发表评论:
◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。