TakwayBoard/takway/audio_utils.py

579 lines
22 KiB
Python

import io
import os
import time
import pyaudio
import wave
import json
import warnings
import threading
import numpy as np
from collections import deque
from .common_utils import encode_bytes2str, decode_str2bytes
from takway.board import *
try:
import keyboard
except:
pass
def play_audio(audio_data, type='base64'):
'''
读取base64编码的音频流并播放
'''
# PyAudio配置
p = pyaudio.PyAudio()
stream = p.open(format=pyaudio.paInt16, channels=1, rate=22050, output=True)
# 播放音频
stream.write(audio_data)
stream.stop_stream()
stream.close()
p.terminate()
'''
import librosa
def reshape_sample_rate(audio, sr_original=None, sr_target=16000):
# 获取原始采样率和音频数据
if isinstance(audio, tuple):
sr_original, audio_data = audio
elif isinstance(audio, bytes):
audio_data = np.frombuffer(audio, dtype=np.int16)
assert sr_original is not None, f"sr_original should be provided if audio is a \
numpy.ndarray, but got sr_original `{sr_original}`."
if isinstance(audio_data, np.ndarray):
if audio_data.dtype == np.dtype('int16'):
audio_data = audio_data.astype(np.float32) / np.iinfo(np.int16).max
assert audio_data.dtype == np.dtype('float32'), f"audio_data should be float32, \
but got {audio_data.dtype}."
else:
raise TypeError(f"audio_data should be numpy.ndarray, but got {type(audio_data)}.")
# 重新采样音频数据
audio_data_resampled = librosa.resample(audio_data, orig_sr=sr_original, target_sr=sr_target)
if audio_data_resampled.dtype == np.dtype('float32'):
audio_data_resampled = np.int16(audio_data_resampled * np.iinfo(np.int16).max)
# If the input was bytes, return the resampled data as bytes
if isinstance(audio, bytes):
audio_data_resampled = audio_data_resampled.tobytes()
return audio_data_resampled
# Example usage:
# If your audio data is in bytes:
# audio_bytes = b'...' # Your audio data as bytes
# audio_data_resampled = reshape_sample_rate(audio_bytes)
# If your audio data is in numpy int16:
# audio_int16 = np.array([...], dtype=np.int16) # Your audio data as numpy int16
# audio_data_resampled = reshape_sample_rate(audio_int16)
'''
# ####################################################### #
# base audio class
# ####################################################### #
class BaseAudio:
def __init__(self,
filename=None,
input=False,
output=False,
CHUNK=1024,
FORMAT=pyaudio.paInt16,
CHANNELS=1,
RATE=16000,
input_device_index=None,
output_device_index=None,
**kwargs):
self.CHUNK = CHUNK
self.FORMAT = FORMAT
self.CHANNELS = CHANNELS
self.RATE = RATE
self.filename = filename
assert input!= output, "input and output cannot be the same, \
but got input={} and output={}.".format(input, output)
print("------------------------------------------")
print(f"{'Input' if input else 'Output'} Audio Initialization: ")
print(f"CHUNK: {self.CHUNK} \nFORMAT: {self.FORMAT} \nCHANNELS: {self.CHANNELS} \nRATE: {self.RATE} \ninput_device_index: {input_device_index} \noutput_device_index: {output_device_index}")
print("------------------------------------------")
self.p = pyaudio.PyAudio()
self.stream = self.p.open(format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=input,
output=output,
input_device_index=input_device_index,
output_device_index=output_device_index,
**kwargs)
def load_audio_file(self, wav_file):
with wave.open(wav_file, 'rb') as wf:
params = wf.getparams()
frames = wf.readframes(params.nframes)
print("Audio file loaded.")
# Audio Parameters
# print("Channels:", params.nchannels)
# print("Sample width:", params.sampwidth)
# print("Frame rate:", params.framerate)
# print("Number of frames:", params.nframes)
# print("Compression type:", params.comptype)
return frames
def check_audio_type(self, audio_data, return_type=None):
assert return_type in ['bytes', 'io', None], \
"return_type should be 'bytes', 'io' or None."
if isinstance(audio_data, str):
if len(audio_data) > 50:
audio_data = decode_str2bytes(audio_data)
else:
assert os.path.isfile(audio_data), \
"audio_data should be a file path or a bytes object."
wf = wave.open(audio_data, 'rb')
audio_data = wf.readframes(wf.getnframes())
elif isinstance(audio_data, np.ndarray):
if audio_data.dtype == np.dtype('float32'):
audio_data = np.int16(audio_data * np.iinfo(np.int16).max)
audio_data = audio_data.tobytes()
elif isinstance(audio_data, bytes):
pass
else:
raise TypeError(f"audio_data must be bytes, numpy.ndarray or str, \
but got {type(audio_data)}")
if return_type == None:
return audio_data
return self.write_wave(None, [audio_data], return_type)
def write_wave(self, filename, frames, return_type='io'):
"""Write audio data to a file."""
if isinstance(frames, bytes):
frames = [frames]
if not isinstance(frames, list):
raise TypeError("frames should be \
a list of bytes or a bytes object, \
but got {}.".format(type(frames)))
if return_type == 'io':
if filename is None:
filename = io.BytesIO()
if self.filename:
filename = self.filename
return self.write_wave_io(filename, frames)
elif return_type == 'bytes':
return self.write_wave_bytes(frames)
def write_wave_io(self, filename, frames):
"""
Write audio data to a file-like object.
Args:
filename: [string or file-like object], file path or file-like object to write
frames: list of bytes, audio data to write
"""
wf = wave.open(filename, 'wb')
# 设置WAV文件的参数
wf.setnchannels(self.CHANNELS)
wf.setsampwidth(self.p.get_sample_size(self.FORMAT))
wf.setframerate(self.RATE)
wf.writeframes(b''.join(frames))
wf.close()
if isinstance(filename, io.BytesIO):
filename.seek(0) # reset file pointer to beginning
return filename
def write_wave_bytes(self, frames):
"""Write audio data to a bytes object."""
return b''.join(frames)
# ####################################################### #
# play audio data from Speaker
# ####################################################### #
class AudioPlayer(BaseAudio):
def __init__(self,
RATE=22050,
**kwargs):
super().__init__(output=True, RATE=RATE, **kwargs)
def play(self, audio_data):
# print("Playing audio data...")
audio_data = self.check_audio_type(audio_data, return_type=None)
for i in range(0, len(audio_data), self.CHUNK):
self.stream.write(audio_data[i:i+self.CHUNK])
# print("Playing audio data...{}/{}".format(i, len(audio_data)))
self.stream.write(audio_data[i+self.CHUNK:])
# print("Audio data played.")
def close(self):
self.stream.stop_stream()
self.stream.close()
self.p.terminate()
# ####################################################### #
# record audio data from microphone
# ####################################################### #
class BaseRecorder(BaseAudio):
def __init__(self,
input=True,
base_chunk_size=None,
RATE=16000,
**kwargs):
super().__init__(input=input, RATE=RATE, **kwargs)
self.base_chunk_size = base_chunk_size
if base_chunk_size is None:
self.base_chunk_size = self.CHUNK
def record(self,
filename,
duration=5,
return_type='io',
logger=None):
if logger is not None:
logger.info("Recording started.")
else:
print("Recording started.")
frames = []
for i in range(0, int(self.RATE / self.CHUNK * duration)):
data = self.stream.read(self.CHUNK, exception_on_overflow=False)
frames.append(data)
if logger is not None:
logger.info("Recording stopped.")
else:
print("Recording stopped.")
return self.write_wave(filename, frames, return_type)
def record_chunk_voice(self,
return_type='bytes',
CHUNK=None,
exception_on_overflow=True,
queue=None):
data = self.stream.read(self.CHUNK if CHUNK is None else CHUNK,
exception_on_overflow=exception_on_overflow)
if return_type is not None:
return self.write_wave(None, [data], return_type)
return data
class HDRecorder(BaseRecorder):
def __init__(self,
board=None,
hd_trigger='keyboard',
keyboard_key='space',
voice_trigger=True,
hd_chunk_size=None,
hd_detect_threshold=50,
**kwargs):
super().__init__(**kwargs)
assert hd_trigger in ['keyboard', 'button']
self.hd_trigger = hd_trigger
self.voice_trigger = voice_trigger
self.hd_chunk_size = hd_chunk_size
if hd_chunk_size is None:
self.hd_chunk_size = self.base_chunk_size
if board == None:
assert hd_trigger == 'keyboard', "board should be `None` if hd_trigger is `keyboard`."
self.keyboard_key = keyboard_key
self.hardware = Keyboard(hd_trigger, keyboard_key, hd_detect_threshold)
else:
assert hd_trigger == 'button', f"hd_trigger should be `button` if board is `v329` or `orangepi`, but got `{hd_trigger}`."
if board == 'v329':
self.hardware = V329(hd_trigger, hd_detect_threshold)
elif board == 'orangepi':
self.hardware = OrangePi(hd_trigger, hd_detect_threshold, enable_start_light=True)
print(f"Using {hd_trigger} as hardware trigger.")
def wait_for_hardware_pressed(self):
return self.hardware.wait_for_hardware_pressed()
@property
def is_hardware_pressed(self):
return self.hardware.is_hardware_pressed
def record_hardware(self, return_type='bytes'):
"""record audio when hardware trigger"""
print("Recording started for hardware trigger.")
frames = []
self.wait_for_hardware_pressed()
while True:
if self.hd_trigger == 'keyboard':
if keyboard.is_pressed(self.keyboard_key):
print("recording...")
data = self.record_chunk_voice(
CHUNK=self.CHUNK,
return_type=None,
exception_on_overflow=False)
frames.append(data)
else:
break
print("Recording stopped.")
elif self.hd_trigger == 'button':
if self.get_button_status():
data = self.stream.read(self.CHUNK)
frames.append(data)
else:
break
else:
recording = False
raise ValueError("hd_trigger should be 'keyboard' or 'button'.")
return self.write_wave(self.filename, frames, return_type)
'''
def record(self, return_type='bytes', queue=None):
if self.hd_trigger == 'all':
value_list = [] # 用于记录value的状态
if keyboard.is_pressed(self.keyboard_key):
audio_data = self.record_keyboard(return_type, queue)
elif self.button.get_value() == 0:
if self.get_button_status():
audio_data = self.record_button(return_type, queue)
else:
audio_data = self.record_voice(return_type, queue)
elif self.hd_trigger == 'keyboard':
print("Press SPACE to start recording.")
keyboard.wait("space")
audio_data = self.record_keyboard(return_type, queue)
elif self.hd_trigger == 'button':
print("Touch to start recording...")
if self.button.get_value() == 0:
if self.get_button_status():
audio_data = self.record_button(return_type, queue)
else:
audio_data = self.record_voice(return_type, queue)
return audio_data
def record_keyboard(self, return_type='bytes', queue=None):
"""record audio when keyboard pressing"""
print("Recording started.")
frames = []
recording = True
while recording:
if keyboard.is_pressed(self.keyboard_key):
data = self.stream.read(self.CHUNK)
frames.append(data)
else:
recording = False
print("Recording stopped.")
return self.write_wave(self.filename, frames, return_type)
def record_button(self, return_type='bytes', queue=None):
"""record audio when button pressing"""
print("Recording started.")
frames = []
recording = True
while recording:
value = self.button.get_value()
if value == 0:
data = self.stream.read(CHUNK)
frames.append(data)
else:
recording = False
print("Recording stopped.")
return self.write_wave(self.filename, frames, return_type)
'''
# ####################################################### #
# record audio data from microphone with VAD
# ####################################################### #
try:
import webrtcvad
webrtcvad_available = True
except:
warnings.warn("webrtcvad module not found, please install it if use `vad` hd_trigger.")
webrtcvad_available = False
class VADRecorder(HDRecorder):
def __init__(self, vad_sensitivity=0, frame_duration=30, vad_buffer_size=7, min_act_time=0.3,**kwargs):
super().__init__(**kwargs)
if webrtcvad_available:
self.vad = webrtcvad.Vad(vad_sensitivity)
self.vad_buffer_size = vad_buffer_size
self.vad_chunk_size = int(self.RATE * frame_duration / 1000)
self.min_act_time = min_act_time # 最小活动时间,单位秒
self.is_currently_speaking = False
self.frames = []
def is_speech(self, data):
return self.vad.is_speech(data, self.RATE)
def vad_filter(self, data):
pass
def vad_record(self, return_type='io', CHUNK=None, queue=None, save_file=False):
"""录音并进行语音活动检测人声并返回分割后的音频数据"""
all_frames = []
buffer_size = self.vad_buffer_size
active_buffer = deque([False for i in range(buffer_size)], maxlen=buffer_size)
audio_buffer = deque(maxlen=buffer_size)
silence_buffer = deque([True for i in range(buffer_size)], maxlen=buffer_size)
print("vad_recorded_audio VAD started. Press Ctrl+C to stop.")
try:
while True:
data = self.stream.read(self.vad_chunk_size)
all_frames.append(data)
print(f"VAD processing..., is_speech: {self.is_speech(data)}")
if self.is_speech(data):
# 标志位buffer
active_buffer.append(True); active_buffer.popleft()
silence_buffer.append(False); silence_buffer.popleft()
# 暂时增加到buffer中
audio_buffer.append(data)
# 如果满足检测要求
if all(active_buffer):
if not self.is_currently_speaking:
print("Speech start detected")
self.is_currently_speaking = True
self.frames.extend(audio_buffer) # 把说话的buffer也加上
if self.is_currently_speaking:
self.frames.append(data)
else:
# 标志位buffer
active_buffer.append(False); active_buffer.popleft()
silence_buffer.append(True); silence_buffer.popleft()
# 检测到人声并持续录音
if self.is_currently_speaking:
# 结束标志位
if all(silence_buffer):
print("Speech end detected")
break
except KeyboardInterrupt:
print("KeyboardInterrupt")
finally:
print("Stopping...")
if len(all_frames) > 0:
print(f"ALL frame: {len(all_frames)}")
print(f"ASR frame: {len(self.frames)}")
if save_file:
self.write_wave(f"output_{time.time()}_all.wav", all_frames)
self.write_wave(f"output_{time.time()}.wav", self.frames)
return self.write_wave(None, self.frames, return_type='bytes')
# ####################################################### #
# record audio data from microphone with PicoVoice hot words detection
# ####################################################### #
import struct
from datetime import datetime
import pvporcupine
class PicovoiceRecorder(VADRecorder):
def __init__(self,
access_key,
keywords=None,
keyword_paths=None,
model_path=None,
sensitivities=0.5,
library_path=None,
**kwargs):
super().__init__(**kwargs)
pico_cfg = dict(
access_key=access_key,
keywords=keywords,
keyword_paths=keyword_paths,
model_path=model_path,
sensitivities=sensitivities,
library_path=library_path,
)
self.pico_detector_init(pico_cfg)
self.keywords = self.pico_cfg['keywords']
print(f"PicovoiceRecorder initialized with keywords: {self.keywords}")
def pico_detector_init(self, pico_cfg):
if pico_cfg['keyword_paths'] is None:
if pico_cfg['keywords'] is None:
raise ValueError(f"Either `--keywords` or `--keyword_paths` must be set. \
Available keywords: {list(pvporcupine.KEYWORDS)}")
keyword_paths = [pvporcupine.KEYWORD_PATHS[x] for x in pico_cfg['keywords']]
else:
keyword_paths = pico_cfg['keyword_paths']
if pico_cfg['sensitivities'] is None:
pico_cfg['sensitivities'] = [0.5] * len(keyword_paths)
elif isinstance(pico_cfg['sensitivities'], float):
pico_cfg['sensitivities'] = [pico_cfg['sensitivities']] * len(keyword_paths)
if len(keyword_paths) != len(pico_cfg['sensitivities']):
raise ValueError('Number of keywords does not match the number of sensitivities.')
try:
self.porcupine = pvporcupine.create(
access_key=pico_cfg['access_key'],
keywords=pico_cfg['keywords'],
keyword_paths=keyword_paths,
model_path=pico_cfg['model_path'],
sensitivities=pico_cfg['sensitivities'],
library_path=pico_cfg['library_path'])
except pvporcupine.PorcupineInvalidArgumentError as e:
print("One or more arguments provided to Porcupine is invalid: ", pico_cfg.keys())
print(e)
raise e
except pvporcupine.PorcupineActivationError as e:
print("AccessKey activation error")
raise e
except pvporcupine.PorcupineActivationLimitError as e:
print("AccessKey '%s' has reached it's temporary device limit" % pico_cfg['access_key'])
raise e
except pvporcupine.PorcupineActivationRefusedError as e:
print("AccessKey '%s' refused" % pico_cfg['access_key'])
raise e
except pvporcupine.PorcupineActivationThrottledError as e:
print("AccessKey '%s' has been throttled" % pico_cfg['access_key'])
raise e
except pvporcupine.PorcupineError as e:
print("Failed to initialize Porcupine")
raise e
self.pico_cfg = pico_cfg
def is_wakeup(self, data):
pcm = struct.unpack_from("h" * self.porcupine.frame_length, data)
result = self.porcupine.process(pcm)
# print(f"picovoice result: {result}")
if result >= 0:
print('[%s] Detected %s' % (str(datetime.now()), self.keywords[result]))
return True
# self.write_wave(f"output_{time.time()}.wav", [data])
# print(f"write to: output_{time.time()}.wav")
return False
def record_picovoice(self, return_type=None, exception_on_overflow=False, queue=None):
print("Recording started. Press Ctrl+C to stop.")
while True:
data = self.record_chunk_voice(
return_type=None,
CHUNK=self.porcupine.frame_length,
exception_on_overflow=exception_on_overflow,
queue=queue)
wake_up = self.is_wakeup(data)
if wake_up:
break
return True