from ..schemas.chat_schema import * from ..dependencies.logger import get_logger from ..dependencies.summarizer import get_summarizer from ..dependencies.asr import get_asr from ..dependencies.tts import get_tts from .controller_enum import * from ..models import UserCharacter, Session, Character, User, Audio from utils.audio_utils import VAD from fastapi import WebSocket, HTTPException, status from datetime import datetime from utils.xf_asr_utils import generate_xf_asr_url from config import get_config import numpy as np import uuid import json import asyncio import aiohttp # 依赖注入获取logger logger = get_logger() # 依赖注入获取context总结服务 summarizer = get_summarizer() # -----------------------获取ASR------------------------- asr = get_asr() # ------------------------------------------------------- # -------------------------TTS-------------------------- tts = get_tts() # ------------------------------------------------------- # 依赖注入获取Config Config = get_config() # ----------------------工具函数------------------------- #获取session内容 def get_session_content(session_id,redis,db): session_content_str = "" if redis.exists(session_id): session_content_str = redis.get(session_id) else: session_db = db.query(Session).filter(Session.id == session_id).first() if not session_db: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Session not found") session_content_str = session_db.content return json.loads(session_content_str) #解析大模型流式返回内容 def parseChunkDelta(chunk): try: if chunk == b"": return 1,"" decoded_data = chunk.decode('utf-8') parsed_data = json.loads(decoded_data[6:]) if 'delta' in parsed_data['choices'][0]: delta_content = parsed_data['choices'][0]['delta'] return -1, delta_content['content'] else: return parsed_data['usage']['total_tokens'] , "" except KeyError: logger.error(f"error chunk: {decoded_data}") return 1,"" except json.JSONDecodeError: logger.error(f"error chunk: {decoded_data}") return 1,"" #断句函数 def split_string_with_punctuation(current_sentence,text,is_first,is_end): try: result = [] if is_end: if current_sentence: result.append(current_sentence) current_sentence = '' return result, current_sentence, is_first for char in text: current_sentence += char if is_first and char in ',.?!,。?!': result.append(current_sentence) current_sentence = '' is_first = False elif char in '。?!': result.append(current_sentence) current_sentence = '' return result, current_sentence, is_first except Exception as e: logger.error(f"断句时出现错误: {str(e)}") #vad预处理 def vad_preprocess(audio): if len(audio)<1280: return ('A'*1280) return audio[:1280],audio[1280:] #更新session活跃时间 def update_session_activity(session_id,db): try: db.query(Session).filter(Session.id == session_id).update({"last_activity": datetime.now()}) db.commit() except Exception as e: db.roolback() raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) #获取target_se def get_emb(session_id,db): try: session_record = db.query(Session).filter(Session.id == session_id).first() user_character_record = db.query(UserCharacter).filter(UserCharacter.id == session_record.user_character_id).first() audio_record = db.query(Audio).filter(Audio.user_id == user_character_record.user_id).first() emb_npy = np.frombuffer(audio_record.emb_data,dtype=np.int32) emb_npy_3d = np.reshape(emb_npy,(1,256,1)) return emb_npy_3d except Exception as e: logger.error("未找到音频:"+str(e)) return np.array([]) #-------------------------------------------------------- # 创建新聊天 async def create_chat_handler(chat: ChatCreateRequest, db, redis): # 创建新的UserCharacter记录 new_chat = UserCharacter(user_id=chat.user_id, character_id=chat.character_id) try: db.add(new_chat) db.commit() except Exception as e: db.rollback() raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) # 查询所要创建聊天的角色信息,并创建SystemPrompt db_character = db.query(Character).filter(Character.id == chat.character_id).first() db_user = db.query(User).filter(User.id == chat.user_id).first() system_prompt = f"""你现在正在扮演"{db_character.name}"这一角色。我希望你模仿{db_character.name}的人格,包括知识、语言和行为方式、性格等。在角色扮演中,你需要遵守以下要求:1. 你必须始终保持角色扮演并待在角色设定的情景中,不得擅自跳出角色扮演,不得说你不知道角色相关信息或你是一个AI。\n2. 保持简短、通俗易懂的口语化方式进行对话。\n3. 为了使对话更生动,你需要在对话中添加文字形式的表情和动作,用括号包裹,比如"早上好,主人。(双手提起裙摆)"。\n\n你需要扮演的角色的信息是:{db_character.description}\n\n""""" if db_character.world_scenario: system_prompt += f"所处的世界背景是:{db_character.world_scenario}" if db_character.emojis: system_prompt += f"尽可能多地使用这些表情{db_character.emojis}\n\n" if db_character.dialogues: system_prompt += f"以下是{db_character.name}这一角色的对话,请你参考:\n\n{db_character.dialogues}" # 创建新的Session记录 session_id = str(uuid.uuid4()) user_id = chat.user_id messages = json.dumps([{"role": "system", "content": system_prompt}], ensure_ascii=False) tts_info = { "language": 0, "speaker_id":db_character.voice_id, "noise_scale": 0.1, "noise_scale_w":0.668, "length_scale": 1.2 } llm_info = { "model": "abab5.5-chat", "temperature": 1, "top_p": 0.9, } user_info = { "character":"", "events":[] } # 将tts和llm信息转化为json字符串 tts_info_str = json.dumps(tts_info, ensure_ascii=False) llm_info_str = json.dumps(llm_info, ensure_ascii=False) user_info_str = json.dumps(user_info, ensure_ascii=False) token = 0 content = {"user_id": user_id, "messages": messages, "user_info": user_info_str, "tts_info": tts_info_str, "llm_info": llm_info_str, "token": token} new_session = Session(id=session_id, user_character_id=new_chat.id, content=json.dumps(content, ensure_ascii=False), last_activity=datetime.now(), is_permanent=False) # 将Session记录存入 db.add(new_session) db.commit() redis.set(session_id, json.dumps(content, ensure_ascii=False)) chat_create_data = ChatCreateData(user_character_id=new_chat.id, session_id=session_id, createdAt=datetime.now().isoformat()) return ChatCreateResponse(status="success", message="创建聊天成功", data=chat_create_data) #删除聊天 async def delete_chat_handler(user_character_id, db, redis): # 查询该聊天记录 user_character_record = db.query(UserCharacter).filter(UserCharacter.id == user_character_id).first() if not user_character_record: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="UserCharacter not found") session_record = db.query(Session).filter(Session.user_character_id == user_character_id).first() try: redis.delete(session_record.id) except Exception as e: logger.error(f"删除Redis中Session记录时发生错误: {str(e)}") try: db.delete(session_record) db.delete(user_character_record) db.commit() except Exception as e: db.rollback() raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) chat_delete_data = ChatDeleteData(deletedAt=datetime.now().isoformat()) return ChatDeleteResponse(status="success", message="删除聊天成功", data=chat_delete_data) # 非流式聊天 async def non_streaming_chat_handler(chat: ChatNonStreamRequest, db, redis): pass #---------------------------------------单次流式聊天接口--------------------------------------------- #处理用户输入 async def sct_user_input_handler(ws,user_input_q,llm_input_q,future_session_id,future_response_type,user_input_finish_event): logger.debug("用户输入处理函数启动") is_future_done = False try: while not user_input_finish_event.is_set(): sct_data_json = json.loads(await ws.receive_text()) if not is_future_done: future_session_id.set_result(sct_data_json['meta_info']['session_id']) if sct_data_json['meta_info']['voice_synthesize']: future_response_type.set_result(RESPONSE_AUDIO) else: future_response_type.set_result(RESPONSE_TEXT) is_future_done = True if sct_data_json['text']: await llm_input_q.put(sct_data_json['text']) if not user_input_finish_event.is_set(): user_input_finish_event.set() break if sct_data_json['meta_info']['is_end']: await user_input_q.put(sct_data_json['audio']) if not user_input_finish_event.is_set(): user_input_finish_event.set() break await user_input_q.put(sct_data_json['audio']) except KeyError as ke: if 'state' in sct_data_json and 'method' in sct_data_json: logger.debug("收到心跳包") except Exception as e: logger.error(f"用户输入处理函数发生错误: {str(e)}") #语音识别 async def sct_asr_handler(session_id,user_input_q,llm_input_q,user_input_finish_event): logger.debug("语音识别函数启动") is_signup = False audio = "" try: current_message = "" while not (user_input_finish_event.is_set() and user_input_q.empty()): if not is_signup: asr.session_signup(session_id) is_signup = True audio_data = await user_input_q.get() audio += audio_data asr_result = asr.streaming_recognize(session_id,audio_data) current_message += ''.join(asr_result['text']) asr_result = asr.streaming_recognize(session_id,b'',is_end=True) current_message += ''.join(asr_result['text']) current_message = asr.punctuation_correction(current_message) emotion_dict = asr.emtion_recognition(audio) #情感辨识 if not isinstance(emotion_dict, str): max_index = emotion_dict['scores'].index(max(emotion_dict['scores'])) current_message = f"{current_message},当前说话人的情绪:{emotion_dict['labels'][max_index]}" await llm_input_q.put(current_message) asr.session_signout(session_id) except Exception as e: asr.session_signout(session_id) logger.error(f"语音识别函数发生错误: {str(e)}") logger.debug(f"接收到用户消息: {current_message}") #大模型调用 async def sct_llm_handler(ws,session_id,response_type,llm_info,tts_info,db,redis,llm_input_q,chat_finished_event): logger.debug("llm调用函数启动") try: llm_response = "" current_sentence = "" is_first = True is_end = False session_content = get_session_content(session_id,redis,db) user_info = json.loads(session_content["user_info"]) messages = json.loads(session_content["messages"]) current_message = await llm_input_q.get() messages.append({'role': 'user', "content": current_message}) messages_send = messages #创造一个message副本,在其中最后一条数据前面添加用户信息 messages_send[-1]['content'] = f"用户性格:{user_info['character']}\n事件摘要:{user_info['events']}" + messages_send[-1]['content'] payload = json.dumps({ "model": llm_info["model"], "stream": True, "messages": messages_send, "max_tokens": 10000, "temperature": llm_info["temperature"], "top_p": llm_info["top_p"] }) headers = { 'Authorization': f"Bearer {Config.MINIMAX_LLM.API_KEY}", 'Content-Type': 'application/json' } target_se = get_emb(session_id,db) except Exception as e: logger.error(f"编辑http请求时发生错误: {str(e)}") try: async with aiohttp.ClientSession() as client: async with client.post(Config.MINIMAX_LLM.URL, headers=headers, data=payload) as response: #发送大模型请求 async for chunk in response.content.iter_any(): token_count, chunk_data = parseChunkDelta(chunk) is_end = token_count >0 if not is_end: llm_response += chunk_data sentences,current_sentence,is_first = split_string_with_punctuation(current_sentence,chunk_data,is_first,is_end) #断句 for sentence in sentences: if response_type == RESPONSE_TEXT: response_message = {"type": "text", "code":200, "msg": sentence} await ws.send_text(json.dumps(response_message, ensure_ascii=False)) #返回文本信息 elif response_type == RESPONSE_AUDIO: if target_se.size == 0: audio,sr = tts._base_tts(text=sentence, noise_scale=tts_info["noise_scale"], noise_scale_w=tts_info["noise_scale_w"], speed=tts_info["length_scale"]) else: audio,sr = tts.synthesize(text=sentence, noise_scale=tts_info["noise_scale"], noise_scale_w=tts_info["noise_scale_w"], speed=tts_info["length_scale"], target_se=target_se) response_message = {"type": "text", "code":200, "msg": sentence} await ws.send_bytes(audio) #返回音频数据 await ws.send_text(json.dumps(response_message, ensure_ascii=False)) #返回文本信息 logger.debug(f"websocket返回: {sentence}") if is_end: logger.debug(f"llm返回结果: {llm_response}") await ws.send_text(json.dumps({"type": "end", "code": 200, "msg": ""}, ensure_ascii=False)) is_end = False #重置is_end标志位 messages.append({'role': 'assistant', "content": llm_response}) session_content["messages"] = json.dumps(messages,ensure_ascii=False) #更新对话 redis.set(session_id,json.dumps(session_content,ensure_ascii=False)) #更新session is_first = True llm_response = "" if token_count > summarizer.max_token * 0.7: #如果llm返回的token数大于60%的最大token数,则进行文本摘要 system_prompt = messages[0]['content'] summary = await summarizer.summarize(messages) events = user_info['events'] events.append(summary['event']) session_content['messages'] = json.dumps([{'role':'system','content':system_prompt}],ensure_ascii=False) session_content['user_info'] = json.dumps({'character': summary['character'], 'events': events}, ensure_ascii=False) redis.set(session_id,json.dumps(session_content,ensure_ascii=False)) logger.debug(f"总结后session_content: {session_content}") except Exception as e: logger.error(f"处理llm返回结果发生错误: {str(e)}") chat_finished_event.set() async def streaming_chat_temporary_handler(ws: WebSocket, db, redis): logger.debug("streaming chat temporary websocket 连接建立") user_input_q = asyncio.Queue() # 用于存储用户输入 llm_input_q = asyncio.Queue() # 用于存储llm输入 user_input_finish_event = asyncio.Event() chat_finished_event = asyncio.Event() future_session_id = asyncio.Future() future_response_type = asyncio.Future() asyncio.create_task(sct_user_input_handler(ws,user_input_q,llm_input_q,future_session_id,future_response_type,user_input_finish_event)) session_id = await future_session_id #获取session_id update_session_activity(session_id,db) response_type = await future_response_type #获取返回类型 asyncio.create_task(sct_asr_handler(session_id,user_input_q,llm_input_q,user_input_finish_event)) tts_info = json.loads(get_session_content(session_id,redis,db)["tts_info"]) llm_info = json.loads(get_session_content(session_id,redis,db)["llm_info"]) asyncio.create_task(sct_llm_handler(ws,session_id,response_type,llm_info,tts_info,db,redis,llm_input_q,chat_finished_event)) while not chat_finished_event.is_set(): await asyncio.sleep(1) await ws.send_text(json.dumps({"type": "close", "code": 200, "msg": ""}, ensure_ascii=False)) await ws.close() logger.debug("streaming chat temporary websocket 连接断开") #--------------------------------------------------------------------------------------------------- #------------------------------------------持续流式聊天---------------------------------------------- #处理用户输入 async def scl_user_input_handler(ws,user_input_q,llm_input_q,future_session_id,future_response_type,input_finished_event): logger.debug("用户输入处理函数启动") is_future_done = False while not input_finished_event.is_set(): try: scl_data_json = json.loads(await asyncio.wait_for(ws.receive_text(),timeout=3)) if scl_data_json['is_close']: input_finished_event.set() break if not is_future_done: future_session_id.set_result(scl_data_json['meta_info']['session_id']) if scl_data_json['meta_info']['voice_synthesize']: future_response_type.set_result(RESPONSE_AUDIO) else: future_response_type.set_result(RESPONSE_TEXT) is_future_done = True if scl_data_json['text']: await llm_input_q.put(scl_data_json['text']) continue if scl_data_json['meta_info']['is_end']: user_input_frame = {"audio": scl_data_json['audio'], "is_end": True} await user_input_q.put(user_input_frame) user_input_frame = {"audio": scl_data_json['audio'], "is_end": False} await user_input_q.put(user_input_frame) except KeyError as ke: if 'state' in scl_data_json and 'method' in scl_data_json: logger.debug("收到心跳包") continue except asyncio.TimeoutError: continue except Exception as e: logger.error(f"用户输入处理函数发生错误: {str(e)}") break #语音识别 async def scl_asr_handler(session_id,user_input_q,llm_input_q,input_finished_event,asr_finished_event): logger.debug("语音识别函数启动") is_signup = False current_message = "" audio = "" while not (input_finished_event.is_set() and user_input_q.empty()): try: aduio_frame = await asyncio.wait_for(user_input_q.get(),timeout=3) if not is_signup: asr.session_signup(session_id) is_signup = True if aduio_frame['is_end']: asr_result = asr.streaming_recognize(session_id,aduio_frame['audio'], is_end=True) current_message += ''.join(asr_result['text']) current_message = asr.punctuation_correction(current_message) audio += aduio_frame['audio'] emotion_dict =asr.emtion_recognition(audio) #情感辨识 if not isinstance(emotion_dict, str): max_index = emotion_dict['scores'].index(max(emotion_dict['scores'])) current_message = f"{current_message}当前说话人的情绪:{emotion_dict['labels'][max_index]}" await llm_input_q.put(current_message) logger.debug(f"接收到用户消息: {current_message}") current_message = "" audio = "" else: asr_result = asr.streaming_recognize(session_id,aduio_frame['audio']) audio += aduio_frame['audio'] current_message += ''.join(asr_result['text']) except asyncio.TimeoutError: continue except Exception as e: logger.error(f"语音识别函数发生错误: {str(e)}") break asr.session_signout(session_id) asr_finished_event.set() #大模型调用 async def scl_llm_handler(ws,session_id,response_type,llm_info,tts_info,db,redis,llm_input_q,asr_finished_event,chat_finished_event): logger.debug("llm调用函数启动") llm_response = "" current_sentence = "" is_first = True is_end = False while not (asr_finished_event.is_set() and llm_input_q.empty()): try: session_content = get_session_content(session_id,redis,db) messages = json.loads(session_content["messages"]) user_info = json.loads(session_content["user_info"]) current_message = await asyncio.wait_for(llm_input_q.get(),timeout=3) messages.append({'role': 'user', "content": current_message}) payload = json.dumps({ "model": llm_info["model"], "stream": True, "messages": messages, "max_tokens": 10000, "temperature": llm_info["temperature"], "top_p": llm_info["top_p"] }) headers = { 'Authorization': f"Bearer {Config.MINIMAX_LLM.API_KEY}", 'Content-Type': 'application/json' } target_se = get_emb(session_id,db) async with aiohttp.ClientSession() as client: async with client.post(Config.MINIMAX_LLM.URL, headers=headers, data=payload) as response: #发送大模型请求 async for chunk in response.content.iter_any(): token_count, chunk_data = parseChunkDelta(chunk) is_end = token_count >0 if not is_end: llm_response += chunk_data sentences,current_sentence,is_first = split_string_with_punctuation(current_sentence,chunk_data,is_first,is_end) for sentence in sentences: if response_type == RESPONSE_TEXT: logger.debug(f"websocket返回: {sentence}") response_message = {"type": "text", "code":200, "msg": sentence} await ws.send_text(json.dumps(response_message, ensure_ascii=False)) elif response_type == RESPONSE_AUDIO: if target_se.size == 0: audio,sr = tts._base_tts(text=sentence, noise_scale=tts_info["noise_scale"], noise_scale_w=tts_info["noise_scale_w"], speed=tts_info["length_scale"]) else: audio,sr = tts.synthesize(text=sentence, noise_scale=tts_info["noise_scale"], noise_scale_w=tts_info["noise_scale_w"], speed=tts_info["length_scale"], target_se=target_se) response_message = {"type": "text", "code":200, "msg": sentence} await ws.send_bytes(audio) await ws.send_text(json.dumps(response_message, ensure_ascii=False)) logger.debug(f"websocket返回: {sentence}") if is_end: logger.debug(f"llm返回结果: {llm_response}") await ws.send_text(json.dumps({"type": "end", "code": 200, "msg": ""}, ensure_ascii=False)) is_end = False messages.append({'role': 'assistant', "content": llm_response}) session_content["messages"] = json.dumps(messages,ensure_ascii=False) #更新对话 redis.set(session_id,json.dumps(session_content,ensure_ascii=False)) #更新session is_first = True llm_response = "" if token_count > summarizer.max_token * 0.7: #如果llm返回的token数大于70%的最大token数,则进行文本摘要 system_prompt = messages[0]['content'] summary = await summarizer.summarize(messages) events = user_info['events'] events.append(summary['event']) session_content['messages'] = json.dumps([{'role':'system','content':system_prompt}],ensure_ascii=False) session_content['user_info'] = json.dumps({'character': summary['character'], 'events': events}, ensure_ascii=False) redis.set(session_id,json.dumps(session_content,ensure_ascii=False)) logger.debug(f"总结后session_content: {session_content}") except asyncio.TimeoutError: continue # except Exception as e: # logger.error(f"处理llm返回结果发生错误: {str(e)}") # break chat_finished_event.set() async def streaming_chat_lasting_handler(ws,db,redis): logger.debug("streaming chat lasting websocket 连接建立") user_input_q = asyncio.Queue() # 用于存储用户输入 llm_input_q = asyncio.Queue() # 用于存储llm输入 input_finished_event = asyncio.Event() asr_finished_event = asyncio.Event() chat_finished_event = asyncio.Event() future_session_id = asyncio.Future() future_response_type = asyncio.Future() asyncio.create_task(scl_user_input_handler(ws,user_input_q,llm_input_q,future_session_id,future_response_type,input_finished_event)) session_id = await future_session_id #获取session_id update_session_activity(session_id,db) response_type = await future_response_type #获取返回类型 tts_info = json.loads(get_session_content(session_id,redis,db)["tts_info"]) llm_info = json.loads(get_session_content(session_id,redis,db)["llm_info"]) asyncio.create_task(scl_asr_handler(session_id,user_input_q,llm_input_q,input_finished_event,asr_finished_event)) asyncio.create_task(scl_llm_handler(ws,session_id,response_type,llm_info,tts_info,db,redis,llm_input_q,asr_finished_event,chat_finished_event)) while not chat_finished_event.is_set(): await asyncio.sleep(3) await ws.send_text(json.dumps({"type": "close", "code": 200, "msg": ""}, ensure_ascii=False)) await ws.close() logger.debug("streaming chat lasting websocket 连接断开") #--------------------------------------------------------------------------------------------------- #--------------------------------语音通话接口-------------------------------------- #音频数据生产函数 async def voice_call_audio_producer(ws,audio_q,future,input_finished_event): logger.debug("音频数据生产函数启动") is_future_done = False audio_data = "" while not input_finished_event.is_set(): try: voice_call_data_json = json.loads(await asyncio.wait_for(ws.receive_text(),timeout=3)) if not is_future_done: #在第一次循环中读取session_id future.set_result(voice_call_data_json['meta_info']['session_id']) is_future_done = True if voice_call_data_json["is_close"]: input_finished_event.set() break else: audio_data += voice_call_data_json["audio"] while len(audio_data) > 1280: vad_frame,audio_data = vad_preprocess(audio_data) await audio_q.put(vad_frame) #将音频数据存入audio_q except KeyError as ke: if 'state' in voice_call_data_json and 'method' in voice_call_data_json: logger.info(f"收到心跳包") except asyncio.TimeoutError: continue except Exception as e: logger.error(f"音频数据生产函数发生错误: {str(e)}") break #音频数据消费函数 async def voice_call_audio_consumer(ws,session_id,audio_q,asr_result_q,input_finished_event,asr_finished_event): logger.debug("音频数据消费者函数启动") vad = VAD() current_message = "" vad_count = 0 is_signup = False audio = "" while not (input_finished_event.is_set() and audio_q.empty()): try: if not is_signup: asr.session_signup(session_id) is_signup = True audio_data = await asyncio.wait_for(audio_q.get(),timeout=3) if vad.is_speech(audio_data): if vad_count > 0: vad_count -= 1 asr_result = asr.streaming_recognize(session_id, audio_data) audio += audio_data current_message += ''.join(asr_result['text']) else: vad_count += 1 if vad_count >= 25: #连续25帧没有语音,则认为说完了 asr_result = asr.streaming_recognize(session_id, audio_data, is_end=True) if current_message: current_message = asr.punctuation_correction(current_message) audio += audio_data emotion_dict =asr.emtion_recognition(audio) #情感辨识 if not isinstance(emotion_dict, str): max_index = emotion_dict['scores'].index(max(emotion_dict['scores'])) current_message = f"{current_message}当前说话人的情绪:{emotion_dict['labels'][max_index]}" logger.debug(f"检测到静默,用户输入为:{current_message}") await asr_result_q.put(current_message) audio = "" text_response = {"type": "user_text", "code": 200, "msg": current_message} await ws.send_text(json.dumps(text_response, ensure_ascii=False)) #返回文本数据 current_message = "" vad_count = 0 except asyncio.TimeoutError: continue except Exception as e: asr.session_signout(session_id) logger.error(f"音频数据消费者函数发生错误: {str(e)}") break asr.session_signout(session_id) asr_finished_event.set() #asr结果消费以及llm返回生产函数 async def voice_call_llm_handler(ws,session_id,llm_info,tts_info,db,redis,asr_result_q,asr_finished_event,voice_call_end_event): logger.debug("asr结果消费以及llm返回生产函数启动") llm_response = "" current_sentence = "" is_first = True is_end = False while not (asr_finished_event.is_set() and asr_result_q.empty()): try: session_content = get_session_content(session_id,redis,db) messages = json.loads(session_content["messages"]) user_info = json.loads(session_content["user_info"]) current_message = await asyncio.wait_for(asr_result_q.get(),timeout=3) messages.append({'role': 'user', "content": current_message}) payload = json.dumps({ "model": llm_info["model"], "stream": True, "messages": messages, "max_tokens":10000, "temperature": llm_info["temperature"], "top_p": llm_info["top_p"] }) headers = { 'Authorization': f"Bearer {Config.MINIMAX_LLM.API_KEY}", 'Content-Type': 'application/json' } target_se = get_emb(session_id,db) async with aiohttp.ClientSession() as client: async with client.post(Config.MINIMAX_LLM.URL, headers=headers, data=payload) as response: #发送大模型请求 async for chunk in response.content.iter_any(): token_count, chunk_data = parseChunkDelta(chunk) is_end = token_count >0 if not is_end: llm_response += chunk_data sentences,current_sentence,is_first = split_string_with_punctuation(current_sentence,chunk_data,is_first,is_end) for sentence in sentences: if target_se.size == 0: audio,sr = tts._base_tts(text=sentence, noise_scale=tts_info["noise_scale"], noise_scale_w=tts_info["noise_scale_w"], speed=tts_info["length_scale"]) else: audio,sr = tts.synthesize(text=sentence, noise_scale=tts_info["noise_scale"], noise_scale_w=tts_info["noise_scale_w"], speed=tts_info["length_scale"], target_se=target_se) text_response = {"type": "llm_text", "code": 200, "msg": sentence} await ws.send_bytes(audio) #返回音频二进制流数据 await ws.send_text(json.dumps(text_response, ensure_ascii=False)) #返回文本数据 logger.debug(f"llm返回结果: {sentence}") if is_end: logger.debug(f"llm返回结果: {llm_response}") await ws.send_text(json.dumps({"type": "end", "code": 200, "msg": ""}, ensure_ascii=False)) is_end = False messages.append({'role': 'assistant', "content": llm_response}) session_content["messages"] = json.dumps(messages,ensure_ascii=False) #更新对话 redis.set(session_id,json.dumps(session_content,ensure_ascii=False)) #更新session is_first = True llm_response = "" if token_count > summarizer.max_token * 0.7: #如果llm返回的token数大于60%的最大token数,则进行文本摘要 system_prompt = messages[0]['content'] summary = await summarizer.summarize(messages) events = user_info['events'] events.append(summary['event']) session_content['messages'] = json.dumps([{'role':'system','content':system_prompt}],ensure_ascii=False) session_content['user_info'] = json.dumps({'character': summary['character'], 'events': events}, ensure_ascii=False) redis.set(session_id,json.dumps(session_content,ensure_ascii=False)) logger.debug(f"总结后session_content: {session_content}") except asyncio.TimeoutError: continue except Exception as e: logger.error(f"处理llm返回结果发生错误: {str(e)}") break voice_call_end_event.set() async def voice_call_handler(ws, db, redis): logger.debug("voice_call websocket 连接建立") audio_q = asyncio.Queue() #音频队列 asr_result_q = asyncio.Queue() #语音识别结果队列 input_finished_event = asyncio.Event() #用户输入结束事件 asr_finished_event = asyncio.Event() #语音识别结束事件 voice_call_end_event = asyncio.Event() #语音电话终止事件 future = asyncio.Future() #用于获取传输的session_id asyncio.create_task(voice_call_audio_producer(ws,audio_q,future,input_finished_event)) #创建音频数据生产者 #获取session内容 session_id = await future #获取session_id update_session_activity(session_id,db) tts_info = json.loads(get_session_content(session_id,redis,db)["tts_info"]) llm_info = json.loads(get_session_content(session_id,redis,db)["llm_info"]) asyncio.create_task(voice_call_audio_consumer(ws,session_id,audio_q,asr_result_q,input_finished_event,asr_finished_event)) #创建音频数据消费者 asyncio.create_task(voice_call_llm_handler(ws,session_id,llm_info,tts_info,db,redis,asr_result_q,asr_finished_event,voice_call_end_event)) #创建llm处理者 while not voice_call_end_event.is_set(): await asyncio.sleep(3) await ws.close() logger.debug("voice_call websocket 连接断开") #------------------------------------------------------------------------------------------