2024-05-23 01:27:51 +08:00
|
|
|
import io
|
|
|
|
import os
|
|
|
|
import time
|
|
|
|
import pyaudio
|
|
|
|
import wave
|
|
|
|
import json
|
|
|
|
import warnings
|
|
|
|
import threading
|
|
|
|
import numpy as np
|
|
|
|
from collections import deque
|
|
|
|
|
|
|
|
from .common_utils import encode_bytes2str, decode_str2bytes
|
|
|
|
|
|
|
|
from takway.board import *
|
|
|
|
try:
|
|
|
|
import keyboard
|
|
|
|
except:
|
|
|
|
pass
|
|
|
|
|
|
|
|
def play_audio(audio_data, type='base64'):
|
|
|
|
'''
|
|
|
|
读取base64编码的音频流并播放
|
|
|
|
'''
|
|
|
|
# PyAudio配置
|
|
|
|
p = pyaudio.PyAudio()
|
|
|
|
stream = p.open(format=pyaudio.paInt16, channels=1, rate=22050, output=True)
|
|
|
|
|
|
|
|
# 播放音频
|
|
|
|
stream.write(audio_data)
|
|
|
|
stream.stop_stream()
|
|
|
|
stream.close()
|
|
|
|
p.terminate()
|
|
|
|
|
|
|
|
'''
|
|
|
|
import librosa
|
|
|
|
def reshape_sample_rate(audio, sr_original=None, sr_target=16000):
|
|
|
|
# 获取原始采样率和音频数据
|
|
|
|
if isinstance(audio, tuple):
|
|
|
|
sr_original, audio_data = audio
|
|
|
|
elif isinstance(audio, bytes):
|
|
|
|
audio_data = np.frombuffer(audio, dtype=np.int16)
|
|
|
|
assert sr_original is not None, f"sr_original should be provided if audio is a \
|
|
|
|
numpy.ndarray, but got sr_original `{sr_original}`."
|
|
|
|
|
|
|
|
if isinstance(audio_data, np.ndarray):
|
|
|
|
if audio_data.dtype == np.dtype('int16'):
|
|
|
|
audio_data = audio_data.astype(np.float32) / np.iinfo(np.int16).max
|
|
|
|
assert audio_data.dtype == np.dtype('float32'), f"audio_data should be float32, \
|
|
|
|
but got {audio_data.dtype}."
|
|
|
|
else:
|
|
|
|
raise TypeError(f"audio_data should be numpy.ndarray, but got {type(audio_data)}.")
|
|
|
|
|
|
|
|
# 重新采样音频数据
|
|
|
|
audio_data_resampled = librosa.resample(audio_data, orig_sr=sr_original, target_sr=sr_target)
|
|
|
|
|
|
|
|
if audio_data_resampled.dtype == np.dtype('float32'):
|
|
|
|
audio_data_resampled = np.int16(audio_data_resampled * np.iinfo(np.int16).max)
|
|
|
|
|
|
|
|
# If the input was bytes, return the resampled data as bytes
|
|
|
|
if isinstance(audio, bytes):
|
|
|
|
audio_data_resampled = audio_data_resampled.tobytes()
|
|
|
|
|
|
|
|
return audio_data_resampled
|
|
|
|
|
|
|
|
# Example usage:
|
|
|
|
# If your audio data is in bytes:
|
|
|
|
# audio_bytes = b'...' # Your audio data as bytes
|
|
|
|
# audio_data_resampled = reshape_sample_rate(audio_bytes)
|
|
|
|
|
|
|
|
# If your audio data is in numpy int16:
|
|
|
|
# audio_int16 = np.array([...], dtype=np.int16) # Your audio data as numpy int16
|
|
|
|
# audio_data_resampled = reshape_sample_rate(audio_int16)
|
|
|
|
'''
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ####################################################### #
|
|
|
|
# base audio class
|
|
|
|
# ####################################################### #
|
|
|
|
|
|
|
|
class BaseAudio:
|
|
|
|
def __init__(self,
|
|
|
|
filename=None,
|
|
|
|
input=False,
|
|
|
|
output=False,
|
|
|
|
CHUNK=1024,
|
|
|
|
FORMAT=pyaudio.paInt16,
|
|
|
|
CHANNELS=1,
|
|
|
|
RATE=16000,
|
|
|
|
input_device_index=None,
|
|
|
|
output_device_index=None,
|
|
|
|
**kwargs):
|
|
|
|
self.CHUNK = CHUNK
|
|
|
|
self.FORMAT = FORMAT
|
|
|
|
self.CHANNELS = CHANNELS
|
|
|
|
self.RATE = RATE
|
|
|
|
self.filename = filename
|
|
|
|
assert input!= output, "input and output cannot be the same, \
|
|
|
|
but got input={} and output={}.".format(input, output)
|
|
|
|
print("------------------------------------------")
|
|
|
|
print(f"{'Input' if input else 'Output'} Audio Initialization: ")
|
|
|
|
print(f"CHUNK: {self.CHUNK} \nFORMAT: {self.FORMAT} \nCHANNELS: {self.CHANNELS} \nRATE: {self.RATE} \ninput_device_index: {input_device_index} \noutput_device_index: {output_device_index}")
|
|
|
|
print("------------------------------------------")
|
|
|
|
self.p = pyaudio.PyAudio()
|
|
|
|
self.stream = self.p.open(format=FORMAT,
|
|
|
|
channels=CHANNELS,
|
|
|
|
rate=RATE,
|
|
|
|
input=input,
|
|
|
|
output=output,
|
|
|
|
input_device_index=input_device_index,
|
|
|
|
output_device_index=output_device_index,
|
|
|
|
**kwargs)
|
|
|
|
|
|
|
|
def load_audio_file(self, wav_file):
|
|
|
|
with wave.open(wav_file, 'rb') as wf:
|
|
|
|
params = wf.getparams()
|
|
|
|
frames = wf.readframes(params.nframes)
|
|
|
|
print("Audio file loaded.")
|
|
|
|
# Audio Parameters
|
|
|
|
# print("Channels:", params.nchannels)
|
|
|
|
# print("Sample width:", params.sampwidth)
|
|
|
|
# print("Frame rate:", params.framerate)
|
|
|
|
# print("Number of frames:", params.nframes)
|
|
|
|
# print("Compression type:", params.comptype)
|
|
|
|
return frames
|
|
|
|
|
|
|
|
def check_audio_type(self, audio_data, return_type=None):
|
|
|
|
assert return_type in ['bytes', 'io', None], \
|
|
|
|
"return_type should be 'bytes', 'io' or None."
|
|
|
|
if isinstance(audio_data, str):
|
|
|
|
if len(audio_data) > 50:
|
|
|
|
audio_data = decode_str2bytes(audio_data)
|
|
|
|
else:
|
|
|
|
assert os.path.isfile(audio_data), \
|
|
|
|
"audio_data should be a file path or a bytes object."
|
|
|
|
wf = wave.open(audio_data, 'rb')
|
|
|
|
audio_data = wf.readframes(wf.getnframes())
|
|
|
|
elif isinstance(audio_data, np.ndarray):
|
|
|
|
if audio_data.dtype == np.dtype('float32'):
|
|
|
|
audio_data = np.int16(audio_data * np.iinfo(np.int16).max)
|
|
|
|
audio_data = audio_data.tobytes()
|
|
|
|
elif isinstance(audio_data, bytes):
|
|
|
|
pass
|
|
|
|
else:
|
|
|
|
raise TypeError(f"audio_data must be bytes, numpy.ndarray or str, \
|
|
|
|
but got {type(audio_data)}")
|
|
|
|
|
|
|
|
if return_type == None:
|
|
|
|
return audio_data
|
|
|
|
return self.write_wave(None, [audio_data], return_type)
|
|
|
|
|
|
|
|
def write_wave(self, filename, frames, return_type='io'):
|
|
|
|
"""Write audio data to a file."""
|
|
|
|
if isinstance(frames, bytes):
|
|
|
|
frames = [frames]
|
|
|
|
if not isinstance(frames, list):
|
|
|
|
raise TypeError("frames should be \
|
|
|
|
a list of bytes or a bytes object, \
|
|
|
|
but got {}.".format(type(frames)))
|
|
|
|
|
|
|
|
if return_type == 'io':
|
|
|
|
if filename is None:
|
|
|
|
filename = io.BytesIO()
|
|
|
|
if self.filename:
|
|
|
|
filename = self.filename
|
|
|
|
return self.write_wave_io(filename, frames)
|
|
|
|
elif return_type == 'bytes':
|
|
|
|
return self.write_wave_bytes(frames)
|
|
|
|
|
|
|
|
|
|
|
|
def write_wave_io(self, filename, frames):
|
|
|
|
"""
|
|
|
|
Write audio data to a file-like object.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
filename: [string or file-like object], file path or file-like object to write
|
|
|
|
frames: list of bytes, audio data to write
|
|
|
|
"""
|
|
|
|
wf = wave.open(filename, 'wb')
|
|
|
|
|
|
|
|
# 设置WAV文件的参数
|
|
|
|
wf.setnchannels(self.CHANNELS)
|
|
|
|
wf.setsampwidth(self.p.get_sample_size(self.FORMAT))
|
|
|
|
wf.setframerate(self.RATE)
|
|
|
|
wf.writeframes(b''.join(frames))
|
|
|
|
wf.close()
|
|
|
|
if isinstance(filename, io.BytesIO):
|
|
|
|
filename.seek(0) # reset file pointer to beginning
|
|
|
|
return filename
|
|
|
|
|
|
|
|
def write_wave_bytes(self, frames):
|
|
|
|
"""Write audio data to a bytes object."""
|
|
|
|
return b''.join(frames)
|
|
|
|
|
|
|
|
|
|
|
|
# ####################################################### #
|
|
|
|
# play audio data from Speaker
|
|
|
|
# ####################################################### #
|
|
|
|
|
|
|
|
class AudioPlayer(BaseAudio):
|
|
|
|
def __init__(self,
|
|
|
|
RATE=22050,
|
|
|
|
**kwargs):
|
|
|
|
super().__init__(output=True, RATE=RATE, **kwargs)
|
|
|
|
|
|
|
|
def play(self, audio_data):
|
|
|
|
# print("Playing audio data...")
|
|
|
|
audio_data = self.check_audio_type(audio_data, return_type=None)
|
|
|
|
|
|
|
|
for i in range(0, len(audio_data), self.CHUNK):
|
|
|
|
self.stream.write(audio_data[i:i+self.CHUNK])
|
|
|
|
# print("Playing audio data...{}/{}".format(i, len(audio_data)))
|
|
|
|
self.stream.write(audio_data[i+self.CHUNK:])
|
|
|
|
# print("Audio data played.")
|
|
|
|
|
|
|
|
|
|
|
|
def close(self):
|
|
|
|
self.stream.stop_stream()
|
|
|
|
self.stream.close()
|
|
|
|
self.p.terminate()
|
|
|
|
|
|
|
|
# ####################################################### #
|
|
|
|
# record audio data from microphone
|
|
|
|
# ####################################################### #
|
|
|
|
class BaseRecorder(BaseAudio):
|
|
|
|
def __init__(self,
|
|
|
|
input=True,
|
|
|
|
base_chunk_size=None,
|
|
|
|
RATE=16000,
|
|
|
|
**kwargs):
|
|
|
|
super().__init__(input=input, RATE=RATE, **kwargs)
|
|
|
|
self.base_chunk_size = base_chunk_size
|
|
|
|
if base_chunk_size is None:
|
|
|
|
self.base_chunk_size = self.CHUNK
|
|
|
|
|
|
|
|
def record(self,
|
|
|
|
filename,
|
|
|
|
duration=5,
|
|
|
|
return_type='io',
|
|
|
|
logger=None):
|
|
|
|
if logger is not None:
|
|
|
|
logger.info("Recording started.")
|
|
|
|
else:
|
|
|
|
print("Recording started.")
|
|
|
|
frames = []
|
|
|
|
for i in range(0, int(self.RATE / self.CHUNK * duration)):
|
|
|
|
data = self.stream.read(self.CHUNK, exception_on_overflow=False)
|
|
|
|
frames.append(data)
|
|
|
|
if logger is not None:
|
|
|
|
logger.info("Recording stopped.")
|
|
|
|
else:
|
|
|
|
print("Recording stopped.")
|
|
|
|
return self.write_wave(filename, frames, return_type)
|
|
|
|
|
|
|
|
def record_chunk_voice(self,
|
|
|
|
return_type='bytes',
|
|
|
|
CHUNK=None,
|
|
|
|
exception_on_overflow=True,
|
|
|
|
queue=None):
|
|
|
|
data = self.stream.read(self.CHUNK if CHUNK is None else CHUNK,
|
|
|
|
exception_on_overflow=exception_on_overflow)
|
|
|
|
if return_type is not None:
|
|
|
|
return self.write_wave(None, [data], return_type)
|
|
|
|
return data
|
|
|
|
|
|
|
|
|
|
|
|
class HDRecorder(BaseRecorder):
|
|
|
|
def __init__(self,
|
|
|
|
board=None,
|
|
|
|
hd_trigger='keyboard',
|
|
|
|
keyboard_key='space',
|
|
|
|
voice_trigger=True,
|
|
|
|
hd_chunk_size=None,
|
|
|
|
hd_detect_threshold=50,
|
|
|
|
**kwargs):
|
|
|
|
super().__init__(**kwargs)
|
|
|
|
assert hd_trigger in ['keyboard', 'button']
|
|
|
|
|
|
|
|
self.hd_trigger = hd_trigger
|
|
|
|
self.voice_trigger = voice_trigger
|
|
|
|
|
|
|
|
self.hd_chunk_size = hd_chunk_size
|
|
|
|
if hd_chunk_size is None:
|
|
|
|
self.hd_chunk_size = self.base_chunk_size
|
|
|
|
|
|
|
|
if board == None:
|
|
|
|
assert hd_trigger == 'keyboard', "board should be `None` if hd_trigger is `keyboard`."
|
|
|
|
self.keyboard_key = keyboard_key
|
|
|
|
self.hardware = Keyboard(hd_trigger, keyboard_key, hd_detect_threshold)
|
|
|
|
else:
|
|
|
|
assert hd_trigger == 'button', f"hd_trigger should be `button` if board is `v329` or `orangepi`, but got `{hd_trigger}`."
|
|
|
|
if board == 'v329':
|
|
|
|
self.hardware = V329(hd_trigger, hd_detect_threshold)
|
|
|
|
elif board == 'orangepi':
|
2024-07-15 14:53:50 +08:00
|
|
|
self.hardware = OrangePi(hd_trigger, hd_detect_threshold, enable_start_light=True)
|
2024-05-23 01:27:51 +08:00
|
|
|
print(f"Using {hd_trigger} as hardware trigger.")
|
|
|
|
|
|
|
|
def wait_for_hardware_pressed(self):
|
|
|
|
return self.hardware.wait_for_hardware_pressed()
|
|
|
|
|
|
|
|
@property
|
|
|
|
def is_hardware_pressed(self):
|
|
|
|
return self.hardware.is_hardware_pressed
|
|
|
|
|
|
|
|
def record_hardware(self, return_type='bytes'):
|
|
|
|
"""record audio when hardware trigger"""
|
|
|
|
print("Recording started for hardware trigger.")
|
|
|
|
frames = []
|
|
|
|
self.wait_for_hardware_pressed()
|
|
|
|
while True:
|
|
|
|
if self.hd_trigger == 'keyboard':
|
|
|
|
if keyboard.is_pressed(self.keyboard_key):
|
|
|
|
print("recording...")
|
|
|
|
data = self.record_chunk_voice(
|
|
|
|
CHUNK=self.CHUNK,
|
|
|
|
return_type=None,
|
|
|
|
exception_on_overflow=False)
|
|
|
|
frames.append(data)
|
|
|
|
else:
|
|
|
|
break
|
|
|
|
print("Recording stopped.")
|
|
|
|
elif self.hd_trigger == 'button':
|
|
|
|
if self.get_button_status():
|
|
|
|
data = self.stream.read(self.CHUNK)
|
|
|
|
frames.append(data)
|
|
|
|
else:
|
|
|
|
break
|
|
|
|
else:
|
|
|
|
recording = False
|
|
|
|
raise ValueError("hd_trigger should be 'keyboard' or 'button'.")
|
|
|
|
return self.write_wave(self.filename, frames, return_type)
|
|
|
|
|
|
|
|
'''
|
|
|
|
def record(self, return_type='bytes', queue=None):
|
|
|
|
if self.hd_trigger == 'all':
|
|
|
|
value_list = [] # 用于记录value的状态
|
|
|
|
if keyboard.is_pressed(self.keyboard_key):
|
|
|
|
audio_data = self.record_keyboard(return_type, queue)
|
|
|
|
elif self.button.get_value() == 0:
|
|
|
|
if self.get_button_status():
|
|
|
|
audio_data = self.record_button(return_type, queue)
|
|
|
|
else:
|
|
|
|
audio_data = self.record_voice(return_type, queue)
|
|
|
|
elif self.hd_trigger == 'keyboard':
|
|
|
|
print("Press SPACE to start recording.")
|
|
|
|
keyboard.wait("space")
|
|
|
|
audio_data = self.record_keyboard(return_type, queue)
|
|
|
|
elif self.hd_trigger == 'button':
|
|
|
|
print("Touch to start recording...")
|
|
|
|
if self.button.get_value() == 0:
|
|
|
|
if self.get_button_status():
|
|
|
|
audio_data = self.record_button(return_type, queue)
|
|
|
|
else:
|
|
|
|
audio_data = self.record_voice(return_type, queue)
|
|
|
|
|
|
|
|
return audio_data
|
|
|
|
|
|
|
|
def record_keyboard(self, return_type='bytes', queue=None):
|
|
|
|
"""record audio when keyboard pressing"""
|
|
|
|
print("Recording started.")
|
|
|
|
frames = []
|
|
|
|
recording = True
|
|
|
|
while recording:
|
|
|
|
if keyboard.is_pressed(self.keyboard_key):
|
|
|
|
data = self.stream.read(self.CHUNK)
|
|
|
|
frames.append(data)
|
|
|
|
else:
|
|
|
|
recording = False
|
|
|
|
print("Recording stopped.")
|
|
|
|
return self.write_wave(self.filename, frames, return_type)
|
|
|
|
|
|
|
|
def record_button(self, return_type='bytes', queue=None):
|
|
|
|
"""record audio when button pressing"""
|
|
|
|
print("Recording started.")
|
|
|
|
frames = []
|
|
|
|
recording = True
|
|
|
|
while recording:
|
|
|
|
value = self.button.get_value()
|
|
|
|
if value == 0:
|
|
|
|
data = self.stream.read(CHUNK)
|
|
|
|
frames.append(data)
|
|
|
|
else:
|
|
|
|
recording = False
|
|
|
|
print("Recording stopped.")
|
|
|
|
return self.write_wave(self.filename, frames, return_type)
|
|
|
|
'''
|
|
|
|
|
|
|
|
# ####################################################### #
|
|
|
|
# record audio data from microphone with VAD
|
|
|
|
# ####################################################### #
|
|
|
|
try:
|
|
|
|
import webrtcvad
|
|
|
|
webrtcvad_available = True
|
|
|
|
except:
|
|
|
|
warnings.warn("webrtcvad module not found, please install it if use `vad` hd_trigger.")
|
|
|
|
webrtcvad_available = False
|
|
|
|
|
|
|
|
class VADRecorder(HDRecorder):
|
2024-05-23 16:07:23 +08:00
|
|
|
def __init__(self, vad_sensitivity=0, frame_duration=30, vad_buffer_size=7, min_act_time=0.3,**kwargs):
|
2024-05-23 01:27:51 +08:00
|
|
|
super().__init__(**kwargs)
|
|
|
|
if webrtcvad_available:
|
|
|
|
self.vad = webrtcvad.Vad(vad_sensitivity)
|
|
|
|
self.vad_buffer_size = vad_buffer_size
|
|
|
|
self.vad_chunk_size = int(self.RATE * frame_duration / 1000)
|
|
|
|
|
|
|
|
self.min_act_time = min_act_time # 最小活动时间,单位秒
|
|
|
|
|
|
|
|
self.is_currently_speaking = False
|
|
|
|
self.frames = []
|
|
|
|
|
|
|
|
def is_speech(self, data):
|
|
|
|
return self.vad.is_speech(data, self.RATE)
|
|
|
|
|
|
|
|
def vad_filter(self, data):
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
def vad_record(self, return_type='io', CHUNK=None, queue=None, save_file=False):
|
|
|
|
"""录音并进行语音活动检测人声并返回分割后的音频数据"""
|
|
|
|
all_frames = []
|
|
|
|
|
|
|
|
buffer_size = self.vad_buffer_size
|
|
|
|
active_buffer = deque([False for i in range(buffer_size)], maxlen=buffer_size)
|
|
|
|
audio_buffer = deque(maxlen=buffer_size)
|
|
|
|
silence_buffer = deque([True for i in range(buffer_size)], maxlen=buffer_size)
|
|
|
|
|
|
|
|
print("vad_recorded_audio VAD started. Press Ctrl+C to stop.")
|
|
|
|
try:
|
|
|
|
while True:
|
|
|
|
data = self.stream.read(self.vad_chunk_size)
|
|
|
|
all_frames.append(data)
|
|
|
|
print(f"VAD processing..., is_speech: {self.is_speech(data)}")
|
|
|
|
if self.is_speech(data):
|
|
|
|
# 标志位buffer
|
|
|
|
active_buffer.append(True); active_buffer.popleft()
|
|
|
|
silence_buffer.append(False); silence_buffer.popleft()
|
|
|
|
# 暂时增加到buffer中
|
|
|
|
audio_buffer.append(data)
|
|
|
|
# 如果满足检测要求
|
|
|
|
if all(active_buffer):
|
|
|
|
if not self.is_currently_speaking:
|
|
|
|
print("Speech start detected")
|
|
|
|
self.is_currently_speaking = True
|
|
|
|
self.frames.extend(audio_buffer) # 把说话的buffer也加上
|
|
|
|
if self.is_currently_speaking:
|
|
|
|
self.frames.append(data)
|
|
|
|
else:
|
|
|
|
# 标志位buffer
|
|
|
|
active_buffer.append(False); active_buffer.popleft()
|
|
|
|
silence_buffer.append(True); silence_buffer.popleft()
|
|
|
|
# 检测到人声并持续录音
|
|
|
|
if self.is_currently_speaking:
|
|
|
|
# 结束标志位
|
|
|
|
if all(silence_buffer):
|
|
|
|
print("Speech end detected")
|
|
|
|
break
|
|
|
|
except KeyboardInterrupt:
|
|
|
|
print("KeyboardInterrupt")
|
|
|
|
|
|
|
|
finally:
|
|
|
|
print("Stopping...")
|
|
|
|
if len(all_frames) > 0:
|
|
|
|
print(f"ALL frame: {len(all_frames)}")
|
|
|
|
print(f"ASR frame: {len(self.frames)}")
|
|
|
|
if save_file:
|
|
|
|
self.write_wave(f"output_{time.time()}_all.wav", all_frames)
|
|
|
|
self.write_wave(f"output_{time.time()}.wav", self.frames)
|
|
|
|
return self.write_wave(None, self.frames, return_type='bytes')
|
|
|
|
|
|
|
|
|
|
|
|
# ####################################################### #
|
|
|
|
# record audio data from microphone with PicoVoice hot words detection
|
|
|
|
# ####################################################### #
|
|
|
|
|
|
|
|
import struct
|
|
|
|
from datetime import datetime
|
|
|
|
import pvporcupine
|
|
|
|
|
|
|
|
class PicovoiceRecorder(VADRecorder):
|
|
|
|
def __init__(self,
|
|
|
|
access_key,
|
|
|
|
keywords=None,
|
|
|
|
keyword_paths=None,
|
|
|
|
model_path=None,
|
|
|
|
sensitivities=0.5,
|
|
|
|
library_path=None,
|
|
|
|
**kwargs):
|
|
|
|
|
|
|
|
super().__init__(**kwargs)
|
|
|
|
|
|
|
|
pico_cfg = dict(
|
|
|
|
access_key=access_key,
|
|
|
|
keywords=keywords,
|
|
|
|
keyword_paths=keyword_paths,
|
|
|
|
model_path=model_path,
|
|
|
|
sensitivities=sensitivities,
|
|
|
|
library_path=library_path,
|
|
|
|
)
|
|
|
|
|
|
|
|
self.pico_detector_init(pico_cfg)
|
|
|
|
|
|
|
|
self.keywords = self.pico_cfg['keywords']
|
|
|
|
print(f"PicovoiceRecorder initialized with keywords: {self.keywords}")
|
|
|
|
|
|
|
|
def pico_detector_init(self, pico_cfg):
|
|
|
|
if pico_cfg['keyword_paths'] is None:
|
|
|
|
if pico_cfg['keywords'] is None:
|
|
|
|
raise ValueError(f"Either `--keywords` or `--keyword_paths` must be set. \
|
|
|
|
Available keywords: {list(pvporcupine.KEYWORDS)}")
|
|
|
|
|
|
|
|
keyword_paths = [pvporcupine.KEYWORD_PATHS[x] for x in pico_cfg['keywords']]
|
|
|
|
else:
|
|
|
|
keyword_paths = pico_cfg['keyword_paths']
|
|
|
|
|
|
|
|
if pico_cfg['sensitivities'] is None:
|
|
|
|
pico_cfg['sensitivities'] = [0.5] * len(keyword_paths)
|
|
|
|
elif isinstance(pico_cfg['sensitivities'], float):
|
|
|
|
pico_cfg['sensitivities'] = [pico_cfg['sensitivities']] * len(keyword_paths)
|
|
|
|
|
|
|
|
if len(keyword_paths) != len(pico_cfg['sensitivities']):
|
|
|
|
raise ValueError('Number of keywords does not match the number of sensitivities.')
|
|
|
|
|
|
|
|
try:
|
|
|
|
self.porcupine = pvporcupine.create(
|
|
|
|
access_key=pico_cfg['access_key'],
|
|
|
|
keywords=pico_cfg['keywords'],
|
|
|
|
keyword_paths=keyword_paths,
|
|
|
|
model_path=pico_cfg['model_path'],
|
|
|
|
sensitivities=pico_cfg['sensitivities'],
|
|
|
|
library_path=pico_cfg['library_path'])
|
|
|
|
except pvporcupine.PorcupineInvalidArgumentError as e:
|
|
|
|
print("One or more arguments provided to Porcupine is invalid: ", pico_cfg.keys())
|
|
|
|
print(e)
|
|
|
|
raise e
|
|
|
|
except pvporcupine.PorcupineActivationError as e:
|
|
|
|
print("AccessKey activation error")
|
|
|
|
raise e
|
|
|
|
except pvporcupine.PorcupineActivationLimitError as e:
|
|
|
|
print("AccessKey '%s' has reached it's temporary device limit" % pico_cfg['access_key'])
|
|
|
|
raise e
|
|
|
|
except pvporcupine.PorcupineActivationRefusedError as e:
|
|
|
|
print("AccessKey '%s' refused" % pico_cfg['access_key'])
|
|
|
|
raise e
|
|
|
|
except pvporcupine.PorcupineActivationThrottledError as e:
|
|
|
|
print("AccessKey '%s' has been throttled" % pico_cfg['access_key'])
|
|
|
|
raise e
|
|
|
|
except pvporcupine.PorcupineError as e:
|
|
|
|
print("Failed to initialize Porcupine")
|
|
|
|
raise e
|
|
|
|
|
|
|
|
self.pico_cfg = pico_cfg
|
|
|
|
|
|
|
|
def is_wakeup(self, data):
|
|
|
|
pcm = struct.unpack_from("h" * self.porcupine.frame_length, data)
|
|
|
|
result = self.porcupine.process(pcm)
|
|
|
|
# print(f"picovoice result: {result}")
|
|
|
|
if result >= 0:
|
|
|
|
print('[%s] Detected %s' % (str(datetime.now()), self.keywords[result]))
|
|
|
|
return True
|
|
|
|
# self.write_wave(f"output_{time.time()}.wav", [data])
|
|
|
|
# print(f"write to: output_{time.time()}.wav")
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
def record_picovoice(self, return_type=None, exception_on_overflow=False, queue=None):
|
|
|
|
|
|
|
|
print("Recording started. Press Ctrl+C to stop.")
|
|
|
|
while True:
|
|
|
|
data = self.record_chunk_voice(
|
|
|
|
return_type=None,
|
|
|
|
CHUNK=self.porcupine.frame_length,
|
|
|
|
exception_on_overflow=exception_on_overflow,
|
|
|
|
queue=queue)
|
|
|
|
|
|
|
|
wake_up = self.is_wakeup(data)
|
|
|
|
if wake_up:
|
|
|
|
break
|
|
|
|
return True
|