diff --git a/app/controllers/chat.py b/app/controllers/chat.py index 9ff9464..f187a32 100644 --- a/app/controllers/chat.py +++ b/app/controllers/chat.py @@ -10,7 +10,7 @@ from config import get_config import uuid import json import asyncio -import httpx +import requests # 依赖注入获取logger logger = get_logger() @@ -49,10 +49,10 @@ def get_session_content(session_id,redis,db): #解析大模型流式返回内容 def parseChunkDelta(chunk): try: - if chunk == "": + if chunk == b"": return "" - chunk_json_str = chunk[6:] - parsed_data = json.loads(chunk_json_str) + decoded_data = chunk.decode('utf-8') + parsed_data = json.loads(decoded_data[6:]) if 'delta' in parsed_data['choices'][0]: delta_content = parsed_data['choices'][0]['delta'] return delta_content['content'] @@ -242,72 +242,36 @@ async def sct_llm_handler(ws,session_id,response_type,llm_info,tts_info,db,redis 'Authorization': f"Bearer {Config.MINIMAX_LLM.API_KEY}", 'Content-Type': 'application/json' } - async with httpx.AsyncClient() as client: - response = await client.post(Config.MINIMAX_LLM.URL, headers=headers, data=payload) - async for chunk in response.aiter_lines(): - chunk_data = parseChunkDelta(chunk) - is_end = chunk_data == "end" - if not is_end: - llm_response += chunk_data - sentences,current_sentence,is_first = split_string_with_punctuation(current_sentence,chunk_data,is_first,is_end) - for sentence in sentences: - if response_type == RESPONSE_TEXT: - response_message = {"type": "text", "code":200, "msg": sentence} - await ws.send_text(json.dumps(response_message, ensure_ascii=False)) - elif response_type == RESPONSE_AUDIO: - sr,audio = tts.synthesize(sentence, tts_info["speaker_id"], tts_info["language"], tts_info["noise_scale"], tts_info["noise_scale_w"], tts_info["length_scale"],return_bytes=True) - response_message = {"type": "text", "code":200, "msg": sentence} - await ws.send_bytes(audio) - await ws.send_text(json.dumps(response_message, ensure_ascii=False)) - logger.debug(f"websocket返回: {sentence}") - if is_end: - logger.debug(f"llm返回结果: {llm_response}") - await ws.send_text(json.dumps({"type": "end", "code": 200, "msg": ""}, ensure_ascii=False)) - is_end = False - - session_content = get_session_content(session_id,redis,db) - messages = json.loads(session_content["messages"]) - messages.append({'role': 'assistant', "content": llm_response}) - session_content["messages"] = json.dumps(messages,ensure_ascii=False) #更新对话 - redis.set(session_id,json.dumps(session_content,ensure_ascii=False)) #更新session - is_first = True - llm_response = "" - chat_finished_event.set() - -#大模型返回断句 -async def sct_llm_response_handler(session_id,redis,db,llm_response_q,split_result_q,llm_response_finish_event): - logger.debug("llm返回处理函数启动") - llm_response = "" - current_sentence = "" - is_first = True - while not (llm_response_finish_event.is_set() and llm_response_q.empty()): - llm_chunk = await llm_response_q.get() - llm_response += llm_chunk - sentences, current_sentence, is_first = split_string_with_punctuation(current_sentence, llm_chunk, is_first) + response = requests.post(Config.MINIMAX_LLM.URL, headers=headers, data=payload,stream=True) + for chunk in response.iter_lines(): + chunk_data = parseChunkDelta(chunk) + is_end = chunk_data == "end" + if not is_end: + llm_response += chunk_data + sentences,current_sentence,is_first = split_string_with_punctuation(current_sentence,chunk_data,is_first,is_end) for sentence in sentences: - await split_result_q.put(sentence) - session_content = get_session_content(session_id,redis,db) - messages = json.loads(session_content["messages"]) - messages.append({'role': 'assistant', "content": llm_response}) - session_content["messages"] = json.dumps(messages,ensure_ascii=False) #更新对话 - redis.set(session_id,json.dumps(session_content,ensure_ascii=False)) #更新session - logger.debug(f"llm返回结果: {llm_response}") + if response_type == RESPONSE_TEXT: + response_message = {"type": "text", "code":200, "msg": sentence} + await ws.send_text(json.dumps(response_message, ensure_ascii=False)) + elif response_type == RESPONSE_AUDIO: + sr,audio = tts.synthesize(sentence, tts_info["speaker_id"], tts_info["language"], tts_info["noise_scale"], tts_info["noise_scale_w"], tts_info["length_scale"],return_bytes=True) + response_message = {"type": "text", "code":200, "msg": sentence} + await ws.send_bytes(audio) + await ws.send_text(json.dumps(response_message, ensure_ascii=False)) + logger.debug(f"websocket返回: {sentence}") + if is_end: + logger.debug(f"llm返回结果: {llm_response}") + await ws.send_text(json.dumps({"type": "end", "code": 200, "msg": ""}, ensure_ascii=False)) + is_end = False -#文本返回及语音合成 -async def sct_response_handler(ws,tts_info,response_type,split_result_q,llm_response_finish_event,chat_finish_event): - logger.debug("返回处理函数启动") - while not (llm_response_finish_event.is_set() and split_result_q.empty()): - sentence = await split_result_q.get() - if response_type == RESPONSE_TEXT: - response_message = {"type": "text", "code":200, "msg": sentence} - await ws.send_text(json.dumps(response_message, ensure_ascii=False)) - elif response_type == RESPONSE_AUDIO: - sr,audio = tts.synthesize(sentence, tts_info["speaker_id"], tts_info["language"], tts_info["noise_scale"], tts_info["noise_scale_w"], tts_info["length_scale"],return_bytes=True) - response_message = {"type": "text", "code":200, "msg": sentence} - await ws.send_bytes(audio) - await ws.send_text(json.dumps(response_message, ensure_ascii=False)) - logger.debug(f"websocket返回: {sentence}") - chat_finish_event.set() + session_content = get_session_content(session_id,redis,db) + messages = json.loads(session_content["messages"]) + messages.append({'role': 'assistant', "content": llm_response}) + session_content["messages"] = json.dumps(messages,ensure_ascii=False) #更新对话 + redis.set(session_id,json.dumps(session_content,ensure_ascii=False)) #更新session + is_first = True + llm_response = "" + chat_finished_event.set() async def streaming_chat_temporary_handler(ws: WebSocket, db, redis): logger.debug("streaming chat temporary websocket 连接建立") @@ -417,36 +381,36 @@ async def scl_llm_handler(ws,session_id,response_type,llm_info,tts_info,db,redis 'Authorization': f"Bearer {Config.MINIMAX_LLM.API_KEY}", 'Content-Type': 'application/json' } - async with httpx.AsyncClient() as client: - response = await client.post(Config.MINIMAX_LLM.URL, headers=headers, data=payload) - async for chunk in response.aiter_lines(): - chunk_data = parseChunkDelta(chunk) - is_end = chunk_data == "end" - if not is_end: - llm_response += chunk_data - sentences,current_sentence,is_first = split_string_with_punctuation(current_sentence,chunk_data,is_first,is_end) - for sentence in sentences: - if response_type == RESPONSE_TEXT: - response_message = {"type": "text", "code":200, "msg": sentence} - await ws.send_text(json.dumps(response_message, ensure_ascii=False)) - elif response_type == RESPONSE_AUDIO: - sr,audio = tts.synthesize(sentence, tts_info["speaker_id"], tts_info["language"], tts_info["noise_scale"], tts_info["noise_scale_w"], tts_info["length_scale"],return_bytes=True) - response_message = {"type": "text", "code":200, "msg": sentence} - await ws.send_bytes(audio) - await ws.send_text(json.dumps(response_message, ensure_ascii=False)) - logger.debug(f"websocket返回: {sentence}") - if is_end: - logger.debug(f"llm返回结果: {llm_response}") - await ws.send_text(json.dumps({"type": "end", "code": 200, "msg": ""}, ensure_ascii=False)) - is_end = False + response = requests.post(Config.MINIMAX_LLM.URL, headers=headers, data=payload,stream=True) + for chunk in response.iter_lines(): + chunk_data = parseChunkDelta(chunk) + is_end = chunk_data == "end" + if not is_end: + llm_response += chunk_data + sentences,current_sentence,is_first = split_string_with_punctuation(current_sentence,chunk_data,is_first,is_end) + for sentence in sentences: + if response_type == RESPONSE_TEXT: + logger.debug(f"websocket返回: {sentence}") + response_message = {"type": "text", "code":200, "msg": sentence} + await ws.send_text(json.dumps(response_message, ensure_ascii=False)) + elif response_type == RESPONSE_AUDIO: + sr,audio = tts.synthesize(sentence, tts_info["speaker_id"], tts_info["language"], tts_info["noise_scale"], tts_info["noise_scale_w"], tts_info["length_scale"],return_bytes=True) + response_message = {"type": "text", "code":200, "msg": sentence} + await ws.send_bytes(audio) + await ws.send_text(json.dumps(response_message, ensure_ascii=False)) + logger.debug(f"websocket返回: {sentence}") + if is_end: + logger.debug(f"llm返回结果: {llm_response}") + await ws.send_text(json.dumps({"type": "end", "code": 200, "msg": ""}, ensure_ascii=False)) + is_end = False - session_content = get_session_content(session_id,redis,db) - messages = json.loads(session_content["messages"]) - messages.append({'role': 'assistant', "content": llm_response}) - session_content["messages"] = json.dumps(messages,ensure_ascii=False) #更新对话 - redis.set(session_id,json.dumps(session_content,ensure_ascii=False)) #更新session - is_first = True - llm_response = "" + session_content = get_session_content(session_id,redis,db) + messages = json.loads(session_content["messages"]) + messages.append({'role': 'assistant', "content": llm_response}) + session_content["messages"] = json.dumps(messages,ensure_ascii=False) #更新对话 + redis.set(session_id,json.dumps(session_content,ensure_ascii=False)) #更新session + is_first = True + llm_response = "" except asyncio.TimeoutError: continue chat_finished_event.set() @@ -562,32 +526,31 @@ async def voice_call_llm_handler(ws,session_id,llm_info,tts_info,db,redis,asr_re 'Authorization': f"Bearer {Config.MINIMAX_LLM.API_KEY}", 'Content-Type': 'application/json' } - async with httpx.AsyncClient() as client: - response = await client.post(Config.MINIMAX_LLM.URL, headers=headers, data=payload) - async for chunk in response.aiter_lines(): - chunk_data = parseChunkDelta(chunk) - is_end = chunk_data == "end" - if not is_end: - llm_response += chunk_data - sentences,current_sentence,is_first = split_string_with_punctuation(current_sentence,chunk_data,is_first,is_end) - for sentence in sentences: - sr,audio = tts.synthesize(sentence, tts_info["language"], tts_info["speaker_id"], tts_info["noise_scale"], tts_info["noise_scale_w"], tts_info["length_scale"], return_bytes=True) - text_response = {"type": "llm_text", "code": 200, "msg": sentence} - await ws.send_bytes(audio) #返回音频二进制流数据 - await ws.send_text(json.dumps(text_response, ensure_ascii=False)) #返回文本数据 - logger.debug(f"llm返回结果: {sentence}") - if is_end: - logger.debug(f"llm返回结果: {llm_response}") - await ws.send_text(json.dumps({"type": "end", "code": 200, "msg": ""}, ensure_ascii=False)) - is_end = False + response = requests.post(Config.MINIMAX_LLM.URL, headers=headers, data=payload,stream=True) + for chunk in response.iter_lines(): + chunk_data = parseChunkDelta(chunk) + is_end = chunk_data == "end" + if not is_end: + llm_response += chunk_data + sentences,current_sentence,is_first = split_string_with_punctuation(current_sentence,chunk_data,is_first,is_end) + for sentence in sentences: + sr,audio = tts.synthesize(sentence, tts_info["language"], tts_info["speaker_id"], tts_info["noise_scale"], tts_info["noise_scale_w"], tts_info["length_scale"], return_bytes=True) + text_response = {"type": "llm_text", "code": 200, "msg": sentence} + await ws.send_bytes(audio) #返回音频二进制流数据 + await ws.send_text(json.dumps(text_response, ensure_ascii=False)) #返回文本数据 + logger.debug(f"llm返回结果: {sentence}") + if is_end: + logger.debug(f"llm返回结果: {llm_response}") + await ws.send_text(json.dumps({"type": "end", "code": 200, "msg": ""}, ensure_ascii=False)) + is_end = False - session_content = get_session_content(session_id,redis,db) - messages = json.loads(session_content["messages"]) - messages.append({'role': 'assistant', "content": llm_response}) - session_content["messages"] = json.dumps(messages,ensure_ascii=False) #更新对话 - redis.set(session_id,json.dumps(session_content,ensure_ascii=False)) #更新session - is_first = True - llm_response = "" + session_content = get_session_content(session_id,redis,db) + messages = json.loads(session_content["messages"]) + messages.append({'role': 'assistant', "content": llm_response}) + session_content["messages"] = json.dumps(messages,ensure_ascii=False) #更新对话 + redis.set(session_id,json.dumps(session_content,ensure_ascii=False)) #更新session + is_first = True + llm_response = "" except asyncio.TimeoutError: continue voice_call_end_event.set()