update: 修改requests为aiohttp,实现异步http请求

This commit is contained in:
killua4396 2024-05-18 11:09:44 +08:00
parent b26f3192bc
commit f426878f9e
1 changed files with 80 additions and 77 deletions

View File

@ -10,7 +10,7 @@ from config import get_config
import uuid import uuid
import json import json
import asyncio import asyncio
import requests import aiohttp
# 依赖注入获取logger # 依赖注入获取logger
logger = get_logger() logger = get_logger()
@ -266,35 +266,36 @@ async def sct_llm_handler(ws,session_id,response_type,llm_info,tts_info,db,redis
'Authorization': f"Bearer {Config.MINIMAX_LLM.API_KEY}", 'Authorization': f"Bearer {Config.MINIMAX_LLM.API_KEY}",
'Content-Type': 'application/json' 'Content-Type': 'application/json'
} }
response = requests.post(Config.MINIMAX_LLM.URL, headers=headers, data=payload,stream=True) #调用大模型
except Exception as e: except Exception as e:
logger.error(f"llm调用发生错误: {str(e)}") logger.error(f"编辑http请求时发生错误: {str(e)}")
try: try:
for chunk in response.iter_lines(): async with aiohttp.ClientSession() as client:
chunk_data = parseChunkDelta(chunk) async with client.post(Config.MINIMAX_LLM.URL, headers=headers, data=payload) as response: #发送大模型请求
is_end = chunk_data == "end" async for chunk in response.content.iter_any():
if not is_end: chunk_data = parseChunkDelta(chunk)
llm_response += chunk_data is_end = chunk_data == "end"
sentences,current_sentence,is_first = split_string_with_punctuation(current_sentence,chunk_data,is_first,is_end) #断句 if not is_end:
for sentence in sentences: llm_response += chunk_data
if response_type == RESPONSE_TEXT: sentences,current_sentence,is_first = split_string_with_punctuation(current_sentence,chunk_data,is_first,is_end) #断句
response_message = {"type": "text", "code":200, "msg": sentence} for sentence in sentences:
await ws.send_text(json.dumps(response_message, ensure_ascii=False)) #返回文本信息 if response_type == RESPONSE_TEXT:
elif response_type == RESPONSE_AUDIO: response_message = {"type": "text", "code":200, "msg": sentence}
sr,audio = tts.synthesize(sentence, tts_info["speaker_id"], tts_info["language"], tts_info["noise_scale"], tts_info["noise_scale_w"], tts_info["length_scale"],return_bytes=True) await ws.send_text(json.dumps(response_message, ensure_ascii=False)) #返回文本信息
response_message = {"type": "text", "code":200, "msg": sentence} elif response_type == RESPONSE_AUDIO:
await ws.send_bytes(audio) #返回音频数据 sr,audio = tts.synthesize(sentence, tts_info["speaker_id"], tts_info["language"], tts_info["noise_scale"], tts_info["noise_scale_w"], tts_info["length_scale"],return_bytes=True)
await ws.send_text(json.dumps(response_message, ensure_ascii=False)) #返回文本信息 response_message = {"type": "text", "code":200, "msg": sentence}
logger.debug(f"websocket返回: {sentence}") await ws.send_bytes(audio) #返回音频数据
if is_end: await ws.send_text(json.dumps(response_message, ensure_ascii=False)) #返回文本信息
logger.debug(f"llm返回结果: {llm_response}") logger.debug(f"websocket返回: {sentence}")
await ws.send_text(json.dumps({"type": "end", "code": 200, "msg": ""}, ensure_ascii=False)) if is_end:
is_end = False #重置is_end标志位 logger.debug(f"llm返回结果: {llm_response}")
messages.append({'role': 'assistant', "content": llm_response}) await ws.send_text(json.dumps({"type": "end", "code": 200, "msg": ""}, ensure_ascii=False))
session_content["messages"] = json.dumps(messages,ensure_ascii=False) #更新对话 is_end = False #重置is_end标志位
redis.set(session_id,json.dumps(session_content,ensure_ascii=False)) #更新session messages.append({'role': 'assistant', "content": llm_response})
is_first = True session_content["messages"] = json.dumps(messages,ensure_ascii=False) #更新对话
llm_response = "" redis.set(session_id,json.dumps(session_content,ensure_ascii=False)) #更新session
is_first = True
llm_response = ""
except Exception as e: except Exception as e:
logger.error(f"处理llm返回结果发生错误: {str(e)}") logger.error(f"处理llm返回结果发生错误: {str(e)}")
chat_finished_event.set() chat_finished_event.set()
@ -418,34 +419,35 @@ async def scl_llm_handler(ws,session_id,response_type,llm_info,tts_info,db,redis
'Authorization': f"Bearer {Config.MINIMAX_LLM.API_KEY}", 'Authorization': f"Bearer {Config.MINIMAX_LLM.API_KEY}",
'Content-Type': 'application/json' 'Content-Type': 'application/json'
} }
response = requests.post(Config.MINIMAX_LLM.URL, headers=headers, data=payload,stream=True) async with aiohttp.ClientSession() as client:
for chunk in response.iter_lines(): async with client.post(Config.MINIMAX_LLM.URL, headers=headers, data=payload) as response: #发送大模型请求
chunk_data = parseChunkDelta(chunk) async for chunk in response.content.iter_any():
is_end = chunk_data == "end" chunk_data = parseChunkDelta(chunk)
if not is_end: is_end = chunk_data == "end"
llm_response += chunk_data if not is_end:
sentences,current_sentence,is_first = split_string_with_punctuation(current_sentence,chunk_data,is_first,is_end) llm_response += chunk_data
for sentence in sentences: sentences,current_sentence,is_first = split_string_with_punctuation(current_sentence,chunk_data,is_first,is_end)
if response_type == RESPONSE_TEXT: for sentence in sentences:
logger.debug(f"websocket返回: {sentence}") if response_type == RESPONSE_TEXT:
response_message = {"type": "text", "code":200, "msg": sentence} logger.debug(f"websocket返回: {sentence}")
await ws.send_text(json.dumps(response_message, ensure_ascii=False)) response_message = {"type": "text", "code":200, "msg": sentence}
elif response_type == RESPONSE_AUDIO: await ws.send_text(json.dumps(response_message, ensure_ascii=False))
sr,audio = tts.synthesize(sentence, tts_info["speaker_id"], tts_info["language"], tts_info["noise_scale"], tts_info["noise_scale_w"], tts_info["length_scale"],return_bytes=True) elif response_type == RESPONSE_AUDIO:
response_message = {"type": "text", "code":200, "msg": sentence} sr,audio = tts.synthesize(sentence, tts_info["speaker_id"], tts_info["language"], tts_info["noise_scale"], tts_info["noise_scale_w"], tts_info["length_scale"],return_bytes=True)
await ws.send_bytes(audio) response_message = {"type": "text", "code":200, "msg": sentence}
await ws.send_text(json.dumps(response_message, ensure_ascii=False)) await ws.send_bytes(audio)
logger.debug(f"websocket返回: {sentence}") await ws.send_text(json.dumps(response_message, ensure_ascii=False))
if is_end: logger.debug(f"websocket返回: {sentence}")
logger.debug(f"llm返回结果: {llm_response}") if is_end:
await ws.send_text(json.dumps({"type": "end", "code": 200, "msg": ""}, ensure_ascii=False)) logger.debug(f"llm返回结果: {llm_response}")
is_end = False await ws.send_text(json.dumps({"type": "end", "code": 200, "msg": ""}, ensure_ascii=False))
is_end = False
messages.append({'role': 'assistant', "content": llm_response}) messages.append({'role': 'assistant', "content": llm_response})
session_content["messages"] = json.dumps(messages,ensure_ascii=False) #更新对话 session_content["messages"] = json.dumps(messages,ensure_ascii=False) #更新对话
redis.set(session_id,json.dumps(session_content,ensure_ascii=False)) #更新session redis.set(session_id,json.dumps(session_content,ensure_ascii=False)) #更新session
is_first = True is_first = True
llm_response = "" llm_response = ""
except asyncio.TimeoutError: except asyncio.TimeoutError:
continue continue
except Exception as e: except Exception as e:
@ -578,29 +580,30 @@ async def voice_call_llm_handler(ws,session_id,llm_info,tts_info,db,redis,asr_re
'Authorization': f"Bearer {Config.MINIMAX_LLM.API_KEY}", 'Authorization': f"Bearer {Config.MINIMAX_LLM.API_KEY}",
'Content-Type': 'application/json' 'Content-Type': 'application/json'
} }
response = requests.post(Config.MINIMAX_LLM.URL, headers=headers, data=payload,stream=True) async with aiohttp.ClientSession() as client:
for chunk in response.iter_lines(): async with client.post(Config.MINIMAX_LLM.URL, headers=headers, data=payload) as response: #发送大模型请求
chunk_data = parseChunkDelta(chunk) async for chunk in response.content.iter_any():
is_end = chunk_data == "end" chunk_data = parseChunkDelta(chunk)
if not is_end: is_end = chunk_data == "end"
llm_response += chunk_data if not is_end:
sentences,current_sentence,is_first = split_string_with_punctuation(current_sentence,chunk_data,is_first,is_end) llm_response += chunk_data
for sentence in sentences: sentences,current_sentence,is_first = split_string_with_punctuation(current_sentence,chunk_data,is_first,is_end)
sr,audio = tts.synthesize(sentence, tts_info["language"], tts_info["speaker_id"], tts_info["noise_scale"], tts_info["noise_scale_w"], tts_info["length_scale"], return_bytes=True) for sentence in sentences:
text_response = {"type": "llm_text", "code": 200, "msg": sentence} sr,audio = tts.synthesize(sentence, tts_info["language"], tts_info["speaker_id"], tts_info["noise_scale"], tts_info["noise_scale_w"], tts_info["length_scale"], return_bytes=True)
await ws.send_bytes(audio) #返回音频二进制流数据 text_response = {"type": "llm_text", "code": 200, "msg": sentence}
await ws.send_text(json.dumps(text_response, ensure_ascii=False)) #返回文本数据 await ws.send_bytes(audio) #返回音频二进制流数据
logger.debug(f"llm返回结果: {sentence}") await ws.send_text(json.dumps(text_response, ensure_ascii=False)) #返回文本数据
if is_end: logger.debug(f"llm返回结果: {sentence}")
logger.debug(f"llm返回结果: {llm_response}") if is_end:
await ws.send_text(json.dumps({"type": "end", "code": 200, "msg": ""}, ensure_ascii=False)) logger.debug(f"llm返回结果: {llm_response}")
is_end = False await ws.send_text(json.dumps({"type": "end", "code": 200, "msg": ""}, ensure_ascii=False))
is_end = False
messages.append({'role': 'assistant', "content": llm_response}) messages.append({'role': 'assistant', "content": llm_response})
session_content["messages"] = json.dumps(messages,ensure_ascii=False) #更新对话 session_content["messages"] = json.dumps(messages,ensure_ascii=False) #更新对话
redis.set(session_id,json.dumps(session_content,ensure_ascii=False)) #更新session redis.set(session_id,json.dumps(session_content,ensure_ascii=False)) #更新session
is_first = True is_first = True
llm_response = "" llm_response = ""
except asyncio.TimeoutError: except asyncio.TimeoutError:
continue continue
except Exception as e: except Exception as e: