1、文档

openai:
官方文档:https://platform.openai.com/docs/quickstart
官方python文档:https://github.com/openai/openai-python
batch批量请求:

  • https://platform.openai.com/docs/guides/batch/overview
  • https://github.com/SpellcraftAI/oaib

2、单个请求

  1. client.chat.completions
    import os
    
    # 设置环境变量
    os.environ['OPENAI_API_KEY'] = 'you_key' #用你的openai key
    # 验证环境变量是否设置成功
    print(os.getenv('OPENAI_API_KEY'))
    
    from openai import OpenAI
    client = OpenAI()
    stream=True
    completion = client.chat.completions.create(
        model="gpt-4",
        max_tokens=200,
        temperature=0.5,
        messages=[
            # {"role": "system", "content": "You are a helpful assistant."},
            # {
            #     "role": "user",
            #     "content": "Write a haiku about recursion in programming."
            # },
            {"role": "system", "content": "你是一位作家."},
             {
                "role": "user",
                "content": "写一篇小说,关于猫的."
            }
        ],
        stream=stream
    )
    
    
    if stream:
        output_text = ''
        for chunk in completion:
            # print(chunk.choices[0].delta.content,end='')
            print(chunk.choices[0].delta.content or "", end="")
            content= chunk.choices[0].delta.content
            if content:  # 检查 content 是否为 None
                output_text +=content
        print('\n','*'*100)
        print('接收到内容:\n',output_text)
    
    else:
        print(completion)
        print(completion.choices[0].message)
    

2.requests url的方式,参考官方的curl,可先在postman上试

import requests
import json

url = "https://api.openai.com/v1/chat/completions"

payload = json.dumps({
  "model": "gpt-4",
  "max_tokens":200,
    "temperature":0.5,
  "messages": [
    {
      "role": "system",
      "content": "You are a helpful assistant."
    },
    {
      "role": "user",
      "content": "讲个故事,200字."
    }
  ],
  "stream": False,
  # "response_format":"json"
})
headers = {
  'Authorization': 'Bearer sk-you_key', ##用你的openai key,前面加上Bearer 
  'Content-Type': 'application/json',
 # 'Cookie': '__' #postman生成,或者不要
}

response = requests.request("POST", url, headers=headers, data=payload,stream=True)

print('类型:',type(response))
print(response.text)

#流
# # 遍历响应内容,要转换byte
# for line in response.iter_lines():
#     if line:
#         # 对每一行进行解码并处理
#         line_decoded = line.decode('utf-8').strip()
#         if line_decoded.startswith('data:'):
#             # 去掉 "data: " 前缀并解析为 JSON
#             data = line_decoded[5:].strip()
#             try:
#                 json_data = json.loads(data)
#                 print(json_data)
#                 # print(json_data['choices'][0]['delta']['content']or "", end="")
#                 print(json_data['choices'][0]['delta']['content'] or "", end="")
#             except ValueError as e:
#                 print(f"Invalid JSON: {data}")

3、batch批量请求

  1. 使用oaib库,https://github.com/SpellcraftAI/oaib

    from oaib import Auto
    import asyncio
    import os
    # 设置环境变量
    os.environ['OPENAI_API_KEY'] = 'sk-you_key'
    # 验证环境变量是否设置成功
    print(os.getenv('OPENAI_API_KEY'))
    
    async def get_batch():
         # Automatically set rate limits.
        batch = Auto(workers=8)
    
        # Fetch 1,000 chat completions as quickly as possible, setting rate limits
        # automatically from OpenAI's response headers.
        for i in range(5):
            await batch.add(
                "chat.completions.create", 
                model="gpt-4",
                max_tokens=200,
                temperature=0.5,
                messages=[{"role": "user", "content": "讲个故事"}]
            )
        resp= await batch.run()
        print('类型:',type(resp))
        print('返回:',resp)
    
        for i in resp['result'].tolist():
            print(type(i),i)
        
        # return batch
    
    if __name__ == "__main__":
       
        # batch=get_batch()
        # await batch.run()
    
        # get_batch()
        asyncio.run(get_batch())
    
  2. aiohttp和 asyncio实现异步批量

import aiohttp
import asyncio

# 要请求的URL
url = "https://api.openai.com/v1/chat/completions"

headers = {
  'Authorization': 'Bearer sk-you_key',
  'Content-Type': 'application/json',
  # 'Cookie': '__cf_bm=fGp5'
}

# 要发送的数据
# data_list = [{"name":"urm1","data":"post请求数据1"},{"name":"urm2","data":"post请求数据2"},{"name":"urm3","data":"post请求数据3"}]

data=['讲个故事','讲个笑话','分析股市']
# data_list=[ item['messages'][0]['content']==i for i in data]
data_list=[ {
  "model": "gpt-4",
  "max_tokens":200,
  "temperature":0.5,
  "messages": [
    {
      "role": "system",
      "content": "You are a helpful assistant."
    },
    {
      "role": "user",
      "content": i
    }
  ],
  "stream": False,
  # "response_format":"json"
} for i in data]

print(data_list)
# quit('测试')

async def fetch_data(session, data):
    try:
        async with session.post(url, json=data, headers=headers) as response:
            response.raise_for_status() # 确保响应状态码为2xx,否则引发HTTPError
            return await response.json()
    except aiohttp.ClientError as e:
        print(f"请求失败: {e}")
        return None

async def main():
    con = aiohttp.TCPConnector(ssl=False) #ssl问题
    async with aiohttp.ClientSession(connector=con, trust_env=True) as session:
        tasks = [fetch_data(session, data) for data in data_list]
        results = await asyncio.gather(*tasks)
        for result in results:
            if result:
                print(result)

# 运行主程序
if __name__ == '__main__':
    asyncio.run(main())

4、aiohttp和 asyncio实现异步批量

1、client

import aiohttp
import asyncio

# 要请求的URL
url = 'http://172.31.208.3:8057/testp'

# 要发送的数据
data_list = [{"name":"urm1","data":"post请求数据1"},{"name":"urm2","data":"post请求数据2"},{"name":"urm3","data":"post请求数据3"}]

# {"name":"urm1","data":"post请求数据1"}

# 自定义请求头
# headers = {
#     'User-Agent': 'my-app',
#     'Authorization': 'Bearer YOUR_TOKEN'
# }
headers = {
  'Content-Type': 'application/json'
}

async def fetch_data(session, data):
    try:
        async with session.post(url, json=data, headers=headers) as response:
            response.raise_for_status() # 确保响应状态码为2xx,否则引发HTTPError
            return await response.json()
    except aiohttp.ClientError as e:
        print(f"请求失败: {e}")
        return None

async def main():
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_data(session, data) for data in data_list]
        results = await asyncio.gather(*tasks)
        for result in results:
            if result:
                print(result)

# 运行主程序
if __name__ == '__main__':
    asyncio.run(main())

2、server


from typing import Union

from fastapi import FastAPI
import uvicorn
from pydantic import BaseModel,Field
from typing import Union,Optional,Literal

app = FastAPI()


# 1、get请求

@app.get("/")
def read_root():
    return {"Hello": "World"}

@app.get("/testg")
def read_root(name: str, content: str):
    print('get测试:',name,'和',content)
    return {"name":name, "include":content}

class Item(BaseModel):
    data: str
    name: str


@app.post("/testp")
async def update_item(item: Item):
    print('post测试:',item.name,'和',item.data)

    results = {"name":item.name, "data":item.data,"item":item}
    return results


if __name__ == "__main__":
    #方法一:
    # config = uvicorn.Config("main:app", host='0.0.0.0',port=8888, reload=True, log_level="info") #指定模块,当前用FastAPI()的app
    config = uvicorn.Config('getpost:app',host='0.0.0.0',port=8888, reload=True, log_level="info")
    # config = uvicorn.Config(app,host='0.0.0.0',port=8888, reload=True, log_level="info") #用app,只有在不使用多处理(worker=NUM)或重新加载(reload=True)的情况下,此样式才有效,因此我们建议使用导入字符串样式
    server = uvicorn.Server(config)
    server.run()
    
    #方法二:
    # from pathlib import Path
    # uvicorn.run('getpost:app',host='0.0.0.0',port=8888, reload=True, log_level="info")

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部