from .funasr_utils import FunAutoSpeechRecognizer from .punctuation_utils import CTTRANSFORMER, Punctuation from .emotion_utils import FUNASRFINETUNE, Emotion from .speaker_ver_utils import ERES2NETV2, DEFALUT_SAVE_PATH, speaker_verfication import os import numpy as np class ModifiedRecognizer(FunAutoSpeechRecognizer): def __init__(self, use_punct=True, use_emotion=False, use_speaker_ver=True): # 创建基础的 funasr模型,用于语音识别,识别出不带标点的句子 super().__init__( model_path="paraformer-zh-streaming", device="cuda", RATE=16000, cfg_path=None, debug=False, chunk_ms=480, encoder_chunk_look_back=4, decoder_chunk_look_back=1) # 记录是否具备附加功能 self.use_punct = use_punct self.use_emotion = use_emotion self.use_speaker_ver = use_speaker_ver # 增加标点模型 if use_punct: self.puctuation_model = Punctuation(**CTTRANSFORMER) # 情绪识别模型 if use_emotion: self.emotion_model = Emotion(**FUNASRFINETUNE) # 说话人识别模型 if use_speaker_ver: self.speaker_ver_model = speaker_verfication(**ERES2NETV2) def initialize_speaker(self, speaker_1_wav): """ 用于说话人识别,将输入的音频(speaker_1_wav)设立为目标说话人,并将其特征保存本地 """ if not self.use_speaker_ver: raise NotImplementedError("no access") if speaker_1_wav.endswith(".npy"): self.save_speaker_path = speaker_1_wav elif speaker_1_wav.endswith('.wav'): self.save_speaker_path = os.path.join(DEFALUT_SAVE_PATH, os.path.basename(speaker_1_wav).replace(".wav", ".npy")) # self.save_speaker_path = DEFALUT_SAVE_PATH self.speaker_ver_model.wav2embeddings(speaker_1_wav, self.save_speaker_path) else: raise TypeError("only support [.npy] or [.wav].") def speaker_ver(self, speaker_2_wav): """ 用于说话人识别,判断输入音频是否为目标说话人, 是返回True,不是返回False """ if not self.use_speaker_ver: raise NotImplementedError("no access") if not hasattr(self, "save_speaker_path"): raise NotImplementedError("please initialize speaker first") # self.speaker_ver_model.verfication 返回值为字符串 'yes' / 'no' return self.speaker_ver_model.verfication(base_emb=self.save_speaker_path, speaker_2_wav=speaker_2_wav) == 'yes' def recognize(self, audio_data): """ 非流式语音识别,返回识别出的文本,返回值类型 str """ audio_data = self.check_audio_type(audio_data) # 说话人识别 if self.use_speaker_ver: if self.speaker_ver_model.verfication(self.save_speaker_path, speaker_2_wav=audio_data) == 'no': return "Other People" # 语音识别 result = self.asr_model.generate(input=audio_data, batch_size_s=300, hotword=self.hotwords) text = '' for res in result: text += res['text'] # 添加标点 if self.use_punct: text = self.puctuation_model.process(text+'#', append_period=False).replace('#', '') return text def recognize_emotion(self, audio_data): """ 情感识别,返回值为: 1. 如果说话人非目标说话人,返回字符串 "Other People" 2. 如果说话人为目标说话人,返回字典{"Labels": List[str], "scores": List[int]} """ audio_data = self.check_audio_type(audio_data) if self.use_speaker_ver: if self.speaker_ver_model.verfication(self.save_speaker_path, speaker_2_wav=audio_data) == 'no': return "Other People" if self.use_emotion: return self.emotion_model.process(audio_data) else: raise NotImplementedError("no access") def streaming_recognize(self, session_id, audio_data, is_end=False, auto_det_end=False): """recognize partial result Args: audio_data: bytes or numpy array, partial audio data is_end: bool, whether the audio data is the end of a sentence auto_det_end: bool, whether to automatically detect the end of a audio data 流式语音识别,返回值为: 1. 如果说话人非目标说话人,返回字符串 "Other People" 2. 如果说话人为目标说话人,返回字典{"test": List[str], "is_end": boolean} """ audio_cache = self.audio_cache[session_id] asr_cache = self.asr_cache[session_id] text_dict = dict(text=[], is_end=is_end) audio_data = self.check_audio_type(audio_data) # 说话人识别 if self.use_speaker_ver: if self.speaker_ver_model.verfication(self.save_speaker_path, speaker_2_wav=audio_data) == 'no': return "Other People" # 语音识别 if audio_cache is None: audio_cache = audio_data else: # print(f"audio_data: {audio_data.shape}, audio_cache: {self.audio_cache.shape}") if audio_cache.shape[0] > 0: audio_cache = np.concatenate([audio_cache, audio_data], axis=0) if not is_end and audio_cache.shape[0] < self.chunk_partial_size: self.audio_cache[session_id] = audio_cache return text_dict total_chunk_num = int((len(self.audio_cache)-1)/self.chunk_partial_size) if is_end: # if the audio data is the end of a sentence, \ # we need to add one more chunk to the end to \ # ensure the end of the sentence is recognized correctly. auto_det_end = True if auto_det_end: total_chunk_num += 1 # print(f"chunk_size: {self.chunk_size}, chunk_stride: {self.chunk_partial_size}, total_chunk_num: {total_chunk_num}, len: {len(self.audio_cache)}") end_idx = None for i in range(total_chunk_num): if auto_det_end: is_end = i == total_chunk_num - 1 start_idx = i*self.chunk_partial_size if auto_det_end: end_idx = (i+1)*self.chunk_partial_size if i < total_chunk_num-1 else -1 else: end_idx = (i+1)*self.chunk_partial_size if i < total_chunk_num else -1 # print(f"cut part: {start_idx}:{end_idx}, is_end: {is_end}, i: {i}, total_chunk_num: {total_chunk_num}") # t_stamp = time.time() speech_chunk = audio_cache[start_idx:end_idx] # TODO: exceptions processes try: res = self.asr_model.generate(input=speech_chunk, cache=asr_cache, is_final=is_end, chunk_size=self.chunk_size, encoder_chunk_look_back=self.encoder_chunk_look_back, decoder_chunk_look_back=self.decoder_chunk_look_back) except ValueError as e: print(f"ValueError: {e}") continue # 增添标点 if self.use_punct: text_dict['text'].append(self.puctuation_model.process(self.text_postprecess(res[0], data_id='text'), cache=text_dict)) else: text_dict['text'].append(self.text_postprecess(res[0], data_id='text')) # print(f"each chunk time: {time.time()-t_stamp}") if is_end: audio_cache = None asr_cache = {} else: if end_idx: audio_cache = self.audio_cache[end_idx:] # cut the processed part from audio_cache text_dict['is_end'] = is_end if self.use_punct and is_end: text_dict['text'].append(self.puctuation_model.process('#', cache=text_dict).replace('#', '')) self.audio_cache[session_id] = audio_cache self.asr_cache[session_id] = asr_cache # print(f"text_dict: {text_dict}") return text_dict