import os import io import numpy as np import pyaudio import wave import base64 """ audio utils for modified_funasr_demo.py """ def decode_str2bytes(data): # 将Base64编码的字节串解码为字节串 if data is None: return None return base64.b64decode(data.encode('utf-8')) class BaseAudio: def __init__(self, filename=None, input=False, output=False, CHUNK=1024, FORMAT=pyaudio.paInt16, CHANNELS=1, RATE=16000, input_device_index=None, output_device_index=None, **kwargs): self.CHUNK = CHUNK self.FORMAT = FORMAT self.CHANNELS = CHANNELS self.RATE = RATE self.filename = filename assert input!= output, "input and output cannot be the same, \ but got input={} and output={}.".format(input, output) print("------------------------------------------") print(f"{'Input' if input else 'Output'} Audio Initialization: ") print(f"CHUNK: {self.CHUNK} \nFORMAT: {self.FORMAT} \nCHANNELS: {self.CHANNELS} \nRATE: {self.RATE} \ninput_device_index: {input_device_index} \noutput_device_index: {output_device_index}") print("------------------------------------------") self.p = pyaudio.PyAudio() self.stream = self.p.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=input, output=output, input_device_index=input_device_index, output_device_index=output_device_index, **kwargs) def load_audio_file(self, wav_file): with wave.open(wav_file, 'rb') as wf: params = wf.getparams() frames = wf.readframes(params.nframes) print("Audio file loaded.") # Audio Parameters # print("Channels:", params.nchannels) # print("Sample width:", params.sampwidth) # print("Frame rate:", params.framerate) # print("Number of frames:", params.nframes) # print("Compression type:", params.comptype) return frames def check_audio_type(self, audio_data, return_type=None): assert return_type in ['bytes', 'io', None], \ "return_type should be 'bytes', 'io' or None." if isinstance(audio_data, str): if len(audio_data) > 50: audio_data = decode_str2bytes(audio_data) else: assert os.path.isfile(audio_data), \ "audio_data should be a file path or a bytes object." wf = wave.open(audio_data, 'rb') audio_data = wf.readframes(wf.getnframes()) elif isinstance(audio_data, np.ndarray): if audio_data.dtype == np.dtype('float32'): audio_data = np.int16(audio_data * np.iinfo(np.int16).max) audio_data = audio_data.tobytes() elif isinstance(audio_data, bytes): pass else: raise TypeError(f"audio_data must be bytes, numpy.ndarray or str, \ but got {type(audio_data)}") if return_type == None: return audio_data return self.write_wave(None, [audio_data], return_type) def write_wave(self, filename, frames, return_type='io'): """Write audio data to a file.""" if isinstance(frames, bytes): frames = [frames] if not isinstance(frames, list): raise TypeError("frames should be \ a list of bytes or a bytes object, \ but got {}.".format(type(frames))) if return_type == 'io': if filename is None: filename = io.BytesIO() if self.filename: filename = self.filename return self.write_wave_io(filename, frames) elif return_type == 'bytes': return self.write_wave_bytes(frames) def write_wave_io(self, filename, frames): """ Write audio data to a file-like object. Args: filename: [string or file-like object], file path or file-like object to write frames: list of bytes, audio data to write """ wf = wave.open(filename, 'wb') # 设置WAV文件的参数 wf.setnchannels(self.CHANNELS) wf.setsampwidth(self.p.get_sample_size(self.FORMAT)) wf.setframerate(self.RATE) wf.writeframes(b''.join(frames)) wf.close() if isinstance(filename, io.BytesIO): filename.seek(0) # reset file pointer to beginning return filename def write_wave_bytes(self, frames): """Write audio data to a bytes object.""" return b''.join(frames) class BaseAudio: def __init__(self, filename=None, input=False, output=False, CHUNK=1024, FORMAT=pyaudio.paInt16, CHANNELS=1, RATE=16000, input_device_index=None, output_device_index=None, **kwargs): self.CHUNK = CHUNK self.FORMAT = FORMAT self.CHANNELS = CHANNELS self.RATE = RATE self.filename = filename assert input!= output, "input and output cannot be the same, \ but got input={} and output={}.".format(input, output) print("------------------------------------------") print(f"{'Input' if input else 'Output'} Audio Initialization: ") print(f"CHUNK: {self.CHUNK} \nFORMAT: {self.FORMAT} \nCHANNELS: {self.CHANNELS} \nRATE: {self.RATE} \ninput_device_index: {input_device_index} \noutput_device_index: {output_device_index}") print("------------------------------------------") self.p = pyaudio.PyAudio() self.stream = self.p.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=input, output=output, input_device_index=input_device_index, output_device_index=output_device_index, **kwargs) def load_audio_file(self, wav_file): with wave.open(wav_file, 'rb') as wf: params = wf.getparams() frames = wf.readframes(params.nframes) print("Audio file loaded.") # Audio Parameters # print("Channels:", params.nchannels) # print("Sample width:", params.sampwidth) # print("Frame rate:", params.framerate) # print("Number of frames:", params.nframes) # print("Compression type:", params.comptype) return frames def check_audio_type(self, audio_data, return_type=None): assert return_type in ['bytes', 'io', None], \ "return_type should be 'bytes', 'io' or None." if isinstance(audio_data, str): if len(audio_data) > 50: audio_data = decode_str2bytes(audio_data) else: assert os.path.isfile(audio_data), \ "audio_data should be a file path or a bytes object." wf = wave.open(audio_data, 'rb') audio_data = wf.readframes(wf.getnframes()) elif isinstance(audio_data, np.ndarray): if audio_data.dtype == np.dtype('float32'): audio_data = np.int16(audio_data * np.iinfo(np.int16).max) audio_data = audio_data.tobytes() elif isinstance(audio_data, bytes): pass else: raise TypeError(f"audio_data must be bytes, numpy.ndarray or str, \ but got {type(audio_data)}") if return_type == None: return audio_data return self.write_wave(None, [audio_data], return_type) def write_wave(self, filename, frames, return_type='io'): """Write audio data to a file.""" if isinstance(frames, bytes): frames = [frames] if not isinstance(frames, list): raise TypeError("frames should be \ a list of bytes or a bytes object, \ but got {}.".format(type(frames))) if return_type == 'io': if filename is None: filename = io.BytesIO() if self.filename: filename = self.filename return self.write_wave_io(filename, frames) elif return_type == 'bytes': return self.write_wave_bytes(frames) def write_wave_io(self, filename, frames): """ Write audio data to a file-like object. Args: filename: [string or file-like object], file path or file-like object to write frames: list of bytes, audio data to write """ wf = wave.open(filename, 'wb') # 设置WAV文件的参数 wf.setnchannels(self.CHANNELS) wf.setsampwidth(self.p.get_sample_size(self.FORMAT)) wf.setframerate(self.RATE) wf.writeframes(b''.join(frames)) wf.close() if isinstance(filename, io.BytesIO): filename.seek(0) # reset file pointer to beginning return filename def write_wave_bytes(self, frames): """Write audio data to a bytes object.""" return b''.join(frames) class BaseRecorder(BaseAudio): def __init__(self, input=True, base_chunk_size=None, RATE=16000, **kwargs): super().__init__(input=input, RATE=RATE, **kwargs) self.base_chunk_size = base_chunk_size if base_chunk_size is None: self.base_chunk_size = self.CHUNK def record(self, filename, duration=5, return_type='io', logger=None): if logger is not None: logger.info("Recording started.") else: print("Recording started.") frames = [] for i in range(0, int(self.RATE / self.CHUNK * duration)): data = self.stream.read(self.CHUNK, exception_on_overflow=False) frames.append(data) if logger is not None: logger.info("Recording stopped.") else: print("Recording stopped.") return self.write_wave(filename, frames, return_type) def record_chunk_voice(self, return_type='bytes', CHUNK=None, exception_on_overflow=True, queue=None): data = self.stream.read(self.CHUNK if CHUNK is None else CHUNK, exception_on_overflow=exception_on_overflow) if return_type is not None: return self.write_wave(None, [data], return_type) return data