pun_emo_speaker_utils/takway/stt/modified_funasr.py

168 lines
7.1 KiB
Python

from takway.stt.funasr_utils import FunAutoSpeechRecognizer
from takway.stt.punctuation_utils import CTTRANSFORMER, Punctuation
from takway.stt.emotion_utils import FUNASRFINETUNE, Emotion
from takway.stt.speaker_ver_utils import ERES2NETV2, DEFALUT_SAVE_PATH, speaker_verfication
import os
import pdb
import numpy as np
class ModifiedRecognizer(FunAutoSpeechRecognizer):
def __init__(self,
use_punct=True,
use_emotion=False,
use_speaker_ver=True):
super().__init__(
model_path="paraformer-zh-streaming",
device="cuda",
RATE=16000,
cfg_path=None,
debug=False,
chunk_ms=480,
encoder_chunk_look_back=4,
decoder_chunk_look_back=1)
self.use_punct = use_punct
self.use_emotion = use_emotion
self.use_speaker_ver = use_speaker_ver
if use_punct:
self.puctuation_model = Punctuation(**CTTRANSFORMER)
if use_emotion:
self.emotion_model = Emotion(**FUNASRFINETUNE)
if use_speaker_ver:
self.speaker_ver_model = speaker_verfication(**ERES2NETV2)
def initialize_speaker(self, speaker_1_wav):
if not self.use_speaker_ver:
raise NotImplementedError("no access")
if speaker_1_wav.endswith(".npy"):
self.save_speaker_path = speaker_1_wav
elif speaker_1_wav.endswith('.wav'):
self.save_speaker_path = os.path.join(DEFALUT_SAVE_PATH,
os.path.basename(speaker_1_wav).replace(".wav", ".npy"))
# self.save_speaker_path = DEFALUT_SAVE_PATH
self.speaker_ver_model.wav2embeddings(speaker_1_wav, self.save_speaker_path)
else:
raise TypeError("only support [.npy] or [.wav].")
def speaker_ver(self, speaker_2_wav):
if not self.use_speaker_ver:
raise NotImplementedError("no access")
if not hasattr(self, "save_speaker_path"):
raise NotImplementedError("please initialize speaker first")
# pdb.set_trace()
return self.speaker_ver_model.verfication(base_emb=self.save_speaker_path,
speaker_2_wav=speaker_2_wav) == 'yes'
def recognize(self, audio_data):
audio_data = self.check_audio_type(audio_data)
if self.use_speaker_ver:
if self.speaker_ver_model.verfication(self.save_speaker_path,
speaker_2_wav=audio_data) == 'no':
return "Other People"
result = self.asr_model.generate(input=audio_data,
batch_size_s=300,
hotword=self.hotwords)
text = ''
for res in result:
text += res['text']
if self.use_punct:
text = self.puctuation_model.process(text+'#', append_period=False).replace('#', '')
return text
def recognize_emotion(self, audio_data):
audio_data = self.check_audio_type(audio_data)
if self.use_speaker_ver:
if self.speaker_ver_model.verfication(self.save_speaker_path,
speaker_2_wav=audio_data) == 'no':
return "Other People"
if self.use_emotion:
return self.emotion_model.process(audio_data)
else:
raise NotImplementedError("no access")
def streaming_recognize(self, audio_data, is_end=False, auto_det_end=False):
"""recognize partial result
Args:
audio_data: bytes or numpy array, partial audio data
is_end: bool, whether the audio data is the end of a sentence
auto_det_end: bool, whether to automatically detect the end of a audio data
"""
audio_data = self.check_audio_type(audio_data)
if self.use_speaker_ver:
if self.speaker_ver_model.verfication(self.save_speaker_path,
speaker_2_wav=audio_data) == 'no':
return "Other People"
text_dict = dict(text=[], is_end=is_end)
if self.audio_cache is None:
self.audio_cache = audio_data
else:
# print(f"audio_data: {audio_data.shape}, audio_cache: {self.audio_cache.shape}")
if self.audio_cache.shape[0] > 0:
self.audio_cache = np.concatenate([self.audio_cache, audio_data], axis=0)
if not is_end and self.audio_cache.shape[0] < self.chunk_partial_size:
return text_dict
total_chunk_num = int((len(self.audio_cache)-1)/self.chunk_partial_size)
if is_end:
# if the audio data is the end of a sentence, \
# we need to add one more chunk to the end to \
# ensure the end of the sentence is recognized correctly.
auto_det_end = True
if auto_det_end:
total_chunk_num += 1
# print(f"chunk_size: {self.chunk_size}, chunk_stride: {self.chunk_partial_size}, total_chunk_num: {total_chunk_num}, len: {len(self.audio_cache)}")
end_idx = None
for i in range(total_chunk_num):
if auto_det_end:
is_end = i == total_chunk_num - 1
start_idx = i*self.chunk_partial_size
if auto_det_end:
end_idx = (i+1)*self.chunk_partial_size if i < total_chunk_num-1 else -1
else:
end_idx = (i+1)*self.chunk_partial_size if i < total_chunk_num else -1
# print(f"cut part: {start_idx}:{end_idx}, is_end: {is_end}, i: {i}, total_chunk_num: {total_chunk_num}")
# t_stamp = time.time()
speech_chunk = self.audio_cache[start_idx:end_idx]
# TODO: exceptions processes
try:
res = self.asr_model.generate(input=speech_chunk, cache=self.asr_cache, is_final=is_end, chunk_size=self.chunk_size, encoder_chunk_look_back=self.encoder_chunk_look_back, decoder_chunk_look_back=self.decoder_chunk_look_back)
except ValueError as e:
print(f"ValueError: {e}")
continue
if self.use_punct:
text_dict['text'].append(self.puctuation_model.process(self.text_postprecess(res[0], data_id='text'), cache=text_dict))
else:
text_dict['text'].append(self.text_postprecess(res[0], data_id='text'))
# print(f"each chunk time: {time.time()-t_stamp}")
if is_end:
self.audio_cache = None
self.asr_cache = {}
else:
if end_idx:
self.audio_cache = self.audio_cache[end_idx:] # cut the processed part from audio_cache
text_dict['is_end'] = is_end
if self.use_punct and is_end:
text_dict['text'].append(self.puctuation_model.process('#', cache=text_dict).replace('#', ''))
# print(f"text_dict: {text_dict}")
return text_dict