import os import re from glob import glob from tqdm.auto import tqdm import soundfile as sf import numpy as np import torch from typing import Optional, Union # melo from melo.api import TTS from melo.utils import get_text_for_tts_infer # openvoice from .openvoice import se_extractor from .openvoice.api import ToneColorConverter from .openvoice.mel_processing import spectrogram_torch # torchaudio import torchaudio.functional as F # 存储 BASE SPEAKER 的 embedding(source_se) 的路径 current_file_path = os.path.abspath(__file__) utils_dir = os.path.dirname(os.path.dirname(current_file_path)) SOURCE_SE_DIR = os.path.join(utils_dir,'assets','ses') # 存储缓存文件的路径 CACHE_PATH = r"/tmp/openvoice_cache" OPENVOICE_BASE_TTS={ "model_type": "open_voice_base_tts", # 转换的语言 "language": "ZH", } converter_path = os.path.join(os.path.dirname(current_file_path),'openvoice_model') OPENVOICE_TONE_COLOR_CONVERTER={ "model_type": "open_voice_converter", # 模型参数路径 "converter_path": converter_path, } class TextToSpeech: def __init__(self, use_tone_convert=True, device="cuda", debug:bool=False, ): self.debug = debug self.device = device self.use_tone_convert = use_tone_convert # 默认的源说话人 se self.source_se = None # 默认的目标说话人 se self.target_se = None self.initialize_base_tts(**OPENVOICE_BASE_TTS) if self.debug: print("use tone converter is", self.use_tone_convert) if self.use_tone_convert: self.initialize_tone_color_converter(**OPENVOICE_TONE_COLOR_CONVERTER) self.initialize_source_se() def initialize_tone_color_converter(self, **kwargs): """ 初始化 tone color converter """ model_type = kwargs.pop('model_type') self.tone_color_converter_model_type = model_type if model_type == 'open_voice_converter': # 加载模型 converter_path = kwargs.pop('converter_path') self.tone_color_converter = ToneColorConverter(f'{converter_path}/config.json', self.device) self.tone_color_converter.load_ckpt(f'{converter_path}/checkpoint.pth') if self.debug: print("load tone color converter successfully!") else: raise NotImplementedError(f"only [open_voice_converter] model type expected, but get [{model_type}]. ") def initialize_base_tts(self, **kwargs): """ 初始化 base tts model """ model_type = kwargs.pop('model_type') self.base_tts_model_type = model_type if model_type == "open_voice_base_tts": language = kwargs.pop('language') self.base_tts_model = TTS(language=language, device=self.device) speaker_ids = self.base_tts_model.hps.data.spk2id flag = False for speaker_key in speaker_ids.keys(): if flag: Warning(f'loaded model has more than one speaker, only the first speaker is used. The input speaker ids are {speaker_ids}') break self.speaker_id = speaker_ids[speaker_key] self.speaker_key = speaker_key.lower().replace('_', '-') flag=True if self.debug: print("load base tts model successfully!") # 第一次使用tts时会加载bert模型 self._base_tts("初始化bert模型。") else: raise NotImplementedError(f"only [open_voice_base_tts] model type expected, but get [{model_type}]. ") def initialize_source_se(self): """ 初始化source se """ if self.source_se is not None: Warning("replace source speaker embedding with new source speaker embedding!") self.source_se = torch.load(os.path.join(SOURCE_SE_DIR, f"{self.speaker_key}.pth"), map_location=self.device) def initialize_target_se(self, se: Union[np.ndarray, torch.Tensor]): """ 设置 target se param: se: 输入的se,类型可以为np.ndarray或torch.Tensor """ if self.target_se is not None: Warning("replace target source speaker embedding with new target speaker embedding!") if isinstance(se, np.ndarray): self.target_se = torch.tensor(se.astype(np.float32)).to(self.device) elif isinstance(se, torch.Tensor): self.target_se = se.float().to(self.device) #语音转numpy def audio2numpy(self, audio_data: Union[bytes, np.ndarray]): """ 将字节流的audio转为numpy类型,也可以传入numpy类型 return: np.float32 """ # TODO 是否归一化判断 if isinstance(audio_data, bytes): audio_data = np.frombuffer(audio_data, dtype=np.int16).flatten().astype(np.float32) / 32768.0 elif isinstance(audio_data, np.ndarray): if audio_data.dtype != np.float32: audio_data = audio_data.astype(np.int16).flatten().astype(np.float32) / 32768.0 else: raise TypeError(f"audio_data must be bytes or numpy array, but got {type(audio_data)}") return audio_data def audio2emb(self, audio_data: Union[bytes, np.ndarray], rate=44100, vad=True): """ 将输入的字节流/numpy类型的audio转为speaker embedding param: audio_data: 输入的音频字节 rate: 输入音频的采样率 vad: 是否使用vad模型 return: np.ndarray """ audio_data = self.audio2numpy(audio_data) if not os.path.exists(CACHE_PATH): os.makedirs(CACHE_PATH) from scipy.io import wavfile audio_path = os.path.join(CACHE_PATH, "tmp.wav") wavfile.write(audio_path, rate=rate, data=audio_data) se, _ = se_extractor.get_se(audio_path, self.tone_color_converter, target_dir=CACHE_PATH, vad=False) return se.cpu().detach().numpy() def tensor2numpy(self, audio_data: torch.Tensor): """ tensor类型转numpy """ return audio_data.cpu().detach().float().numpy() def numpy2bytes(self, audio_data): if isinstance(audio_data, np.ndarray): if audio_data.dtype == np.dtype('float32'): audio_data = np.int16(audio_data * np.iinfo(np.int16).max) audio_data = audio_data.tobytes() return audio_data else: raise TypeError("audio_data must be a numpy array") def _base_tts(self, text: str, sdp_ratio=0.2, noise_scale=0.6, noise_scale_w=0.8, speed=1.0, quite=True): """ base语音合成 param: text: 要合成的文本 sdp_ratio: SDP在合成时的占比, 理论上此比率越高, 合成的语音语调方差越大. noise_scale: 样本噪声张量的噪声标度。 noise_scale_w: 推理中随机持续时间预测器的噪声标度 speed: 说话语速 quite: 是否显示进度条 return: audio: tensor sr: 生成音频的采样速率 """ speaker_id = self.speaker_id if self.base_tts_model_type != "open_voice_base_tts": raise NotImplementedError("only [open_voice_base_tts] model type expected.") language = self.base_tts_model.language texts = self.base_tts_model.split_sentences_into_pieces(text, language, quite) audio_list = [] if quite: tx = texts else: tx = tqdm(texts) for t in tx: if language in ['EN', 'ZH_MIX_EN']: t = re.sub(r'([a-z])([A-Z])', r'\1 \2', t) device = self.base_tts_model.device bert, ja_bert, phones, tones, lang_ids = get_text_for_tts_infer(t, language, self.base_tts_model.hps, device, self.base_tts_model.symbol_to_id) with torch.no_grad(): x_tst = phones.to(device).unsqueeze(0) tones = tones.to(device).unsqueeze(0) lang_ids = lang_ids.to(device).unsqueeze(0) bert = bert.to(device).unsqueeze(0) ja_bert = ja_bert.to(device).unsqueeze(0) x_tst_lengths = torch.LongTensor([phones.size(0)]).to(device) del phones speakers = torch.LongTensor([speaker_id]).to(device) audio = self.base_tts_model.model.infer( x_tst, x_tst_lengths, speakers, tones, lang_ids, bert, ja_bert, sdp_ratio=sdp_ratio, noise_scale=noise_scale, noise_scale_w = noise_scale_w, length_scale = 1. / speed, )[0][0, 0].data del x_tst, tones, lang_ids, bert, ja_bert, x_tst_lengths, speakers audio_list.append(audio) torch.cuda.empty_cache() audio_segments = [] sr = self.base_tts_model.hps.data.sampling_rate for segment_data in audio_list: audio_segments.append(segment_data.reshape(-1).contiguous()) audio_segments.append(torch.tensor([0]*int((sr * 0.05) / speed), dtype=segment_data.dtype, device=segment_data.device)) audio_segments = torch.cat(audio_segments, dim=-1) if self.debug: print("generate base speech!") print("**********************,tts sr",sr) print(f"audio segment length is [{audio_segments.shape}]") return audio_segments, sr def _convert_tone(self, audio_data: torch.Tensor, source_se: Optional[np.ndarray]=None, target_se: Optional[np.ndarray]=None, tau :float=0.3, message :str="default"): """ 音色转换 param: audio_data: _base_tts输出的音频数据 source_se: 如果为None, 则使用self.source_se target_se: 如果为None, 则使用self.target_se tau: message: 水印信息 TODO return: audio: tensor sr: 生成音频的采样速率 """ if source_se is not None: source_se = torch.tensor(source_se.astype(np.float32)).to(self.device) if target_se is not None: target_se = torch.tensor(target_se.astype(np.float32)).to(self.device) if source_se is None: source_se = self.source_se if target_se is None: target_se = self.target_se hps = self.tone_color_converter.hps sr = hps.data.sampling_rate if self.debug: print("**********************************, convert sr", sr) audio_data = audio_data.float() with torch.no_grad(): y = audio_data.to(self.tone_color_converter.device) y = y.unsqueeze(0) spec = spectrogram_torch(y, hps.data.filter_length, sr, hps.data.hop_length, hps.data.win_length, center=False).to(self.tone_color_converter.device) spec_lengths = torch.LongTensor([spec.size(-1)]).to(self.tone_color_converter.device) audio = self.tone_color_converter.model.voice_conversion(spec, spec_lengths, sid_src=source_se, sid_tgt=target_se, tau=tau)[0][ 0, 0].data # audio = self.tone_color_converter.add_watermark(audio, message) if self.debug: print("tone color has been converted!") return audio, sr def synthesize(self, text: str, tts_info, source_se: Optional[np.ndarray]=None, target_se: Optional[np.ndarray]=None, sdp_ratio=0.2, quite=True, tau :float=0.3, message :str="default"): """ 整体pipeline _base_tts() _convert_tone() tensor2numpy() numpy2bytes() param: 见_base_tts和_convert_tone return: audio: 字节流音频数据 sr: 音频数据的采样率 """ audio, sr = self._base_tts(text, sdp_ratio=sdp_ratio, noise_scale=tts_info['noise_scale'], noise_scale_w=tts_info['noise_scale_w'], speed=tts_info['speed'], quite=quite) if self.use_tone_convert and target_se.size>0: tts_sr = self.base_tts_model.hps.data.sampling_rate converter_sr = self.tone_color_converter.hps.data.sampling_rate audio = F.resample(audio, tts_sr, converter_sr) audio, sr = self._convert_tone(audio, source_se=source_se, target_se=target_se, tau=tau, message=message) audio = self.tensor2numpy(audio) audio = self.numpy2bytes(audio) return audio, sr def save_audio(self, audio, sample_rate, save_path): """ 将numpy类型的音频数据保存至本地 param: audio: numpy类型的音频数据 sample_rate: 数据采样率 save_path: 保存路径 """ sf.write(save_path, audio, sample_rate) print(f"Audio saved to {save_path}")