1
0
Fork 0

由于大模型返回后处理改为同步,所以改回requests库

This commit is contained in:
Killua777 2024-05-04 11:49:23 +08:00
parent f11339ff92
commit 75016e3009
1 changed files with 85 additions and 122 deletions

View File

@ -10,7 +10,7 @@ from config import get_config
import uuid import uuid
import json import json
import asyncio import asyncio
import httpx import requests
# 依赖注入获取logger # 依赖注入获取logger
logger = get_logger() logger = get_logger()
@ -49,10 +49,10 @@ def get_session_content(session_id,redis,db):
#解析大模型流式返回内容 #解析大模型流式返回内容
def parseChunkDelta(chunk): def parseChunkDelta(chunk):
try: try:
if chunk == "": if chunk == b"":
return "" return ""
chunk_json_str = chunk[6:] decoded_data = chunk.decode('utf-8')
parsed_data = json.loads(chunk_json_str) parsed_data = json.loads(decoded_data[6:])
if 'delta' in parsed_data['choices'][0]: if 'delta' in parsed_data['choices'][0]:
delta_content = parsed_data['choices'][0]['delta'] delta_content = parsed_data['choices'][0]['delta']
return delta_content['content'] return delta_content['content']
@ -242,72 +242,36 @@ async def sct_llm_handler(ws,session_id,response_type,llm_info,tts_info,db,redis
'Authorization': f"Bearer {Config.MINIMAX_LLM.API_KEY}", 'Authorization': f"Bearer {Config.MINIMAX_LLM.API_KEY}",
'Content-Type': 'application/json' 'Content-Type': 'application/json'
} }
async with httpx.AsyncClient() as client: response = requests.post(Config.MINIMAX_LLM.URL, headers=headers, data=payload,stream=True)
response = await client.post(Config.MINIMAX_LLM.URL, headers=headers, data=payload) for chunk in response.iter_lines():
async for chunk in response.aiter_lines(): chunk_data = parseChunkDelta(chunk)
chunk_data = parseChunkDelta(chunk) is_end = chunk_data == "end"
is_end = chunk_data == "end" if not is_end:
if not is_end: llm_response += chunk_data
llm_response += chunk_data sentences,current_sentence,is_first = split_string_with_punctuation(current_sentence,chunk_data,is_first,is_end)
sentences,current_sentence,is_first = split_string_with_punctuation(current_sentence,chunk_data,is_first,is_end)
for sentence in sentences:
if response_type == RESPONSE_TEXT:
response_message = {"type": "text", "code":200, "msg": sentence}
await ws.send_text(json.dumps(response_message, ensure_ascii=False))
elif response_type == RESPONSE_AUDIO:
sr,audio = tts.synthesize(sentence, tts_info["speaker_id"], tts_info["language"], tts_info["noise_scale"], tts_info["noise_scale_w"], tts_info["length_scale"],return_bytes=True)
response_message = {"type": "text", "code":200, "msg": sentence}
await ws.send_bytes(audio)
await ws.send_text(json.dumps(response_message, ensure_ascii=False))
logger.debug(f"websocket返回: {sentence}")
if is_end:
logger.debug(f"llm返回结果: {llm_response}")
await ws.send_text(json.dumps({"type": "end", "code": 200, "msg": ""}, ensure_ascii=False))
is_end = False
session_content = get_session_content(session_id,redis,db)
messages = json.loads(session_content["messages"])
messages.append({'role': 'assistant', "content": llm_response})
session_content["messages"] = json.dumps(messages,ensure_ascii=False) #更新对话
redis.set(session_id,json.dumps(session_content,ensure_ascii=False)) #更新session
is_first = True
llm_response = ""
chat_finished_event.set()
#大模型返回断句
async def sct_llm_response_handler(session_id,redis,db,llm_response_q,split_result_q,llm_response_finish_event):
logger.debug("llm返回处理函数启动")
llm_response = ""
current_sentence = ""
is_first = True
while not (llm_response_finish_event.is_set() and llm_response_q.empty()):
llm_chunk = await llm_response_q.get()
llm_response += llm_chunk
sentences, current_sentence, is_first = split_string_with_punctuation(current_sentence, llm_chunk, is_first)
for sentence in sentences: for sentence in sentences:
await split_result_q.put(sentence) if response_type == RESPONSE_TEXT:
session_content = get_session_content(session_id,redis,db) response_message = {"type": "text", "code":200, "msg": sentence}
messages = json.loads(session_content["messages"]) await ws.send_text(json.dumps(response_message, ensure_ascii=False))
messages.append({'role': 'assistant', "content": llm_response}) elif response_type == RESPONSE_AUDIO:
session_content["messages"] = json.dumps(messages,ensure_ascii=False) #更新对话 sr,audio = tts.synthesize(sentence, tts_info["speaker_id"], tts_info["language"], tts_info["noise_scale"], tts_info["noise_scale_w"], tts_info["length_scale"],return_bytes=True)
redis.set(session_id,json.dumps(session_content,ensure_ascii=False)) #更新session response_message = {"type": "text", "code":200, "msg": sentence}
logger.debug(f"llm返回结果: {llm_response}") await ws.send_bytes(audio)
await ws.send_text(json.dumps(response_message, ensure_ascii=False))
logger.debug(f"websocket返回: {sentence}")
if is_end:
logger.debug(f"llm返回结果: {llm_response}")
await ws.send_text(json.dumps({"type": "end", "code": 200, "msg": ""}, ensure_ascii=False))
is_end = False
#文本返回及语音合成 session_content = get_session_content(session_id,redis,db)
async def sct_response_handler(ws,tts_info,response_type,split_result_q,llm_response_finish_event,chat_finish_event): messages = json.loads(session_content["messages"])
logger.debug("返回处理函数启动") messages.append({'role': 'assistant', "content": llm_response})
while not (llm_response_finish_event.is_set() and split_result_q.empty()): session_content["messages"] = json.dumps(messages,ensure_ascii=False) #更新对话
sentence = await split_result_q.get() redis.set(session_id,json.dumps(session_content,ensure_ascii=False)) #更新session
if response_type == RESPONSE_TEXT: is_first = True
response_message = {"type": "text", "code":200, "msg": sentence} llm_response = ""
await ws.send_text(json.dumps(response_message, ensure_ascii=False)) chat_finished_event.set()
elif response_type == RESPONSE_AUDIO:
sr,audio = tts.synthesize(sentence, tts_info["speaker_id"], tts_info["language"], tts_info["noise_scale"], tts_info["noise_scale_w"], tts_info["length_scale"],return_bytes=True)
response_message = {"type": "text", "code":200, "msg": sentence}
await ws.send_bytes(audio)
await ws.send_text(json.dumps(response_message, ensure_ascii=False))
logger.debug(f"websocket返回: {sentence}")
chat_finish_event.set()
async def streaming_chat_temporary_handler(ws: WebSocket, db, redis): async def streaming_chat_temporary_handler(ws: WebSocket, db, redis):
logger.debug("streaming chat temporary websocket 连接建立") logger.debug("streaming chat temporary websocket 连接建立")
@ -417,36 +381,36 @@ async def scl_llm_handler(ws,session_id,response_type,llm_info,tts_info,db,redis
'Authorization': f"Bearer {Config.MINIMAX_LLM.API_KEY}", 'Authorization': f"Bearer {Config.MINIMAX_LLM.API_KEY}",
'Content-Type': 'application/json' 'Content-Type': 'application/json'
} }
async with httpx.AsyncClient() as client: response = requests.post(Config.MINIMAX_LLM.URL, headers=headers, data=payload,stream=True)
response = await client.post(Config.MINIMAX_LLM.URL, headers=headers, data=payload) for chunk in response.iter_lines():
async for chunk in response.aiter_lines(): chunk_data = parseChunkDelta(chunk)
chunk_data = parseChunkDelta(chunk) is_end = chunk_data == "end"
is_end = chunk_data == "end" if not is_end:
if not is_end: llm_response += chunk_data
llm_response += chunk_data sentences,current_sentence,is_first = split_string_with_punctuation(current_sentence,chunk_data,is_first,is_end)
sentences,current_sentence,is_first = split_string_with_punctuation(current_sentence,chunk_data,is_first,is_end) for sentence in sentences:
for sentence in sentences: if response_type == RESPONSE_TEXT:
if response_type == RESPONSE_TEXT: logger.debug(f"websocket返回: {sentence}")
response_message = {"type": "text", "code":200, "msg": sentence} response_message = {"type": "text", "code":200, "msg": sentence}
await ws.send_text(json.dumps(response_message, ensure_ascii=False)) await ws.send_text(json.dumps(response_message, ensure_ascii=False))
elif response_type == RESPONSE_AUDIO: elif response_type == RESPONSE_AUDIO:
sr,audio = tts.synthesize(sentence, tts_info["speaker_id"], tts_info["language"], tts_info["noise_scale"], tts_info["noise_scale_w"], tts_info["length_scale"],return_bytes=True) sr,audio = tts.synthesize(sentence, tts_info["speaker_id"], tts_info["language"], tts_info["noise_scale"], tts_info["noise_scale_w"], tts_info["length_scale"],return_bytes=True)
response_message = {"type": "text", "code":200, "msg": sentence} response_message = {"type": "text", "code":200, "msg": sentence}
await ws.send_bytes(audio) await ws.send_bytes(audio)
await ws.send_text(json.dumps(response_message, ensure_ascii=False)) await ws.send_text(json.dumps(response_message, ensure_ascii=False))
logger.debug(f"websocket返回: {sentence}") logger.debug(f"websocket返回: {sentence}")
if is_end: if is_end:
logger.debug(f"llm返回结果: {llm_response}") logger.debug(f"llm返回结果: {llm_response}")
await ws.send_text(json.dumps({"type": "end", "code": 200, "msg": ""}, ensure_ascii=False)) await ws.send_text(json.dumps({"type": "end", "code": 200, "msg": ""}, ensure_ascii=False))
is_end = False is_end = False
session_content = get_session_content(session_id,redis,db) session_content = get_session_content(session_id,redis,db)
messages = json.loads(session_content["messages"]) messages = json.loads(session_content["messages"])
messages.append({'role': 'assistant', "content": llm_response}) messages.append({'role': 'assistant', "content": llm_response})
session_content["messages"] = json.dumps(messages,ensure_ascii=False) #更新对话 session_content["messages"] = json.dumps(messages,ensure_ascii=False) #更新对话
redis.set(session_id,json.dumps(session_content,ensure_ascii=False)) #更新session redis.set(session_id,json.dumps(session_content,ensure_ascii=False)) #更新session
is_first = True is_first = True
llm_response = "" llm_response = ""
except asyncio.TimeoutError: except asyncio.TimeoutError:
continue continue
chat_finished_event.set() chat_finished_event.set()
@ -562,32 +526,31 @@ async def voice_call_llm_handler(ws,session_id,llm_info,tts_info,db,redis,asr_re
'Authorization': f"Bearer {Config.MINIMAX_LLM.API_KEY}", 'Authorization': f"Bearer {Config.MINIMAX_LLM.API_KEY}",
'Content-Type': 'application/json' 'Content-Type': 'application/json'
} }
async with httpx.AsyncClient() as client: response = requests.post(Config.MINIMAX_LLM.URL, headers=headers, data=payload,stream=True)
response = await client.post(Config.MINIMAX_LLM.URL, headers=headers, data=payload) for chunk in response.iter_lines():
async for chunk in response.aiter_lines(): chunk_data = parseChunkDelta(chunk)
chunk_data = parseChunkDelta(chunk) is_end = chunk_data == "end"
is_end = chunk_data == "end" if not is_end:
if not is_end: llm_response += chunk_data
llm_response += chunk_data sentences,current_sentence,is_first = split_string_with_punctuation(current_sentence,chunk_data,is_first,is_end)
sentences,current_sentence,is_first = split_string_with_punctuation(current_sentence,chunk_data,is_first,is_end) for sentence in sentences:
for sentence in sentences: sr,audio = tts.synthesize(sentence, tts_info["language"], tts_info["speaker_id"], tts_info["noise_scale"], tts_info["noise_scale_w"], tts_info["length_scale"], return_bytes=True)
sr,audio = tts.synthesize(sentence, tts_info["language"], tts_info["speaker_id"], tts_info["noise_scale"], tts_info["noise_scale_w"], tts_info["length_scale"], return_bytes=True) text_response = {"type": "llm_text", "code": 200, "msg": sentence}
text_response = {"type": "llm_text", "code": 200, "msg": sentence} await ws.send_bytes(audio) #返回音频二进制流数据
await ws.send_bytes(audio) #返回音频二进制流数据 await ws.send_text(json.dumps(text_response, ensure_ascii=False)) #返回文本数据
await ws.send_text(json.dumps(text_response, ensure_ascii=False)) #返回文本数据 logger.debug(f"llm返回结果: {sentence}")
logger.debug(f"llm返回结果: {sentence}") if is_end:
if is_end: logger.debug(f"llm返回结果: {llm_response}")
logger.debug(f"llm返回结果: {llm_response}") await ws.send_text(json.dumps({"type": "end", "code": 200, "msg": ""}, ensure_ascii=False))
await ws.send_text(json.dumps({"type": "end", "code": 200, "msg": ""}, ensure_ascii=False)) is_end = False
is_end = False
session_content = get_session_content(session_id,redis,db) session_content = get_session_content(session_id,redis,db)
messages = json.loads(session_content["messages"]) messages = json.loads(session_content["messages"])
messages.append({'role': 'assistant', "content": llm_response}) messages.append({'role': 'assistant', "content": llm_response})
session_content["messages"] = json.dumps(messages,ensure_ascii=False) #更新对话 session_content["messages"] = json.dumps(messages,ensure_ascii=False) #更新对话
redis.set(session_id,json.dumps(session_content,ensure_ascii=False)) #更新session redis.set(session_id,json.dumps(session_content,ensure_ascii=False)) #更新session
is_first = True is_first = True
llm_response = "" llm_response = ""
except asyncio.TimeoutError: except asyncio.TimeoutError:
continue continue
voice_call_end_event.set() voice_call_end_event.set()