This commit is contained in:
bing 2024-05-11 22:34:08 +08:00
parent a4c6ee2637
commit c6da59ab33
14 changed files with 957 additions and 121 deletions

0
README.md Normal file
View File

29
demo.py Normal file
View File

@ -0,0 +1,29 @@
from takway.audio_utils import BaseRecorder
from takway.stt.funasr_utils import FunAutoSpeechRecognizer
from takway.stt.modified_funasr import ModifiedRecognizer
def asr_file_stream(file_path=r'.\examples\example_recording.wav'):
rec = BaseRecorder()
data = rec.load_audio_file(file_path)
asr = ModifiedRecognizer(use_punct=True, use_emotion=True, use_speaker_ver=True)
asr.initialize_speaker(r".\examples\example_recording.wav")
text_dict = asr.streaming_recognize(data, auto_det_end=True)
print("===============================================")
print(f"text_dict: {text_dict}")
if not isinstance(text_dict, str):
print("".join(text_dict['text']))
print("===============================================")
emotion_dict = asr.recognize_emotion(data)
print(f"emotion_dict: {emotion_dict}")
if not isinstance(emotion_dict, str):
max_index = emotion_dict['scores'].index(max(emotion_dict['scores']))
print("emotion: " +emotion_dict['labels'][max_index])
asr_file_stream()

Binary file not shown.

578
takway/audio_utils.py Normal file
View File

@ -0,0 +1,578 @@
import io
import os
import time
import pyaudio
import wave
import json
import warnings
import threading
import numpy as np
from collections import deque
from .common_utils import encode_bytes2str, decode_str2bytes
from takway.board import *
try:
import keyboard
except:
pass
def play_audio(audio_data, type='base64'):
'''
读取base64编码的音频流并播放
'''
# PyAudio配置
p = pyaudio.PyAudio()
stream = p.open(format=pyaudio.paInt16, channels=1, rate=22050, output=True)
# 播放音频
stream.write(audio_data)
stream.stop_stream()
stream.close()
p.terminate()
'''
import librosa
def reshape_sample_rate(audio, sr_original=None, sr_target=16000):
# 获取原始采样率和音频数据
if isinstance(audio, tuple):
sr_original, audio_data = audio
elif isinstance(audio, bytes):
audio_data = np.frombuffer(audio, dtype=np.int16)
assert sr_original is not None, f"sr_original should be provided if audio is a \
numpy.ndarray, but got sr_original `{sr_original}`."
if isinstance(audio_data, np.ndarray):
if audio_data.dtype == np.dtype('int16'):
audio_data = audio_data.astype(np.float32) / np.iinfo(np.int16).max
assert audio_data.dtype == np.dtype('float32'), f"audio_data should be float32, \
but got {audio_data.dtype}."
else:
raise TypeError(f"audio_data should be numpy.ndarray, but got {type(audio_data)}.")
# 重新采样音频数据
audio_data_resampled = librosa.resample(audio_data, orig_sr=sr_original, target_sr=sr_target)
if audio_data_resampled.dtype == np.dtype('float32'):
audio_data_resampled = np.int16(audio_data_resampled * np.iinfo(np.int16).max)
# If the input was bytes, return the resampled data as bytes
if isinstance(audio, bytes):
audio_data_resampled = audio_data_resampled.tobytes()
return audio_data_resampled
# Example usage:
# If your audio data is in bytes:
# audio_bytes = b'...' # Your audio data as bytes
# audio_data_resampled = reshape_sample_rate(audio_bytes)
# If your audio data is in numpy int16:
# audio_int16 = np.array([...], dtype=np.int16) # Your audio data as numpy int16
# audio_data_resampled = reshape_sample_rate(audio_int16)
'''
# ####################################################### #
# base audio class
# ####################################################### #
class BaseAudio:
def __init__(self,
filename=None,
input=False,
output=False,
CHUNK=1024,
FORMAT=pyaudio.paInt16,
CHANNELS=1,
RATE=16000,
input_device_index=None,
output_device_index=None,
**kwargs):
self.CHUNK = CHUNK
self.FORMAT = FORMAT
self.CHANNELS = CHANNELS
self.RATE = RATE
self.filename = filename
assert input!= output, "input and output cannot be the same, \
but got input={} and output={}.".format(input, output)
print("------------------------------------------")
print(f"{'Input' if input else 'Output'} Audio Initialization: ")
print(f"CHUNK: {self.CHUNK} \nFORMAT: {self.FORMAT} \nCHANNELS: {self.CHANNELS} \nRATE: {self.RATE} \ninput_device_index: {input_device_index} \noutput_device_index: {output_device_index}")
print("------------------------------------------")
self.p = pyaudio.PyAudio()
self.stream = self.p.open(format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=input,
output=output,
input_device_index=input_device_index,
output_device_index=output_device_index,
**kwargs)
def load_audio_file(self, wav_file):
with wave.open(wav_file, 'rb') as wf:
params = wf.getparams()
frames = wf.readframes(params.nframes)
print("Audio file loaded.")
# Audio Parameters
# print("Channels:", params.nchannels)
# print("Sample width:", params.sampwidth)
# print("Frame rate:", params.framerate)
# print("Number of frames:", params.nframes)
# print("Compression type:", params.comptype)
return frames
def check_audio_type(self, audio_data, return_type=None):
assert return_type in ['bytes', 'io', None], \
"return_type should be 'bytes', 'io' or None."
if isinstance(audio_data, str):
if len(audio_data) > 50:
audio_data = decode_str2bytes(audio_data)
else:
assert os.path.isfile(audio_data), \
"audio_data should be a file path or a bytes object."
wf = wave.open(audio_data, 'rb')
audio_data = wf.readframes(wf.getnframes())
elif isinstance(audio_data, np.ndarray):
if audio_data.dtype == np.dtype('float32'):
audio_data = np.int16(audio_data * np.iinfo(np.int16).max)
audio_data = audio_data.tobytes()
elif isinstance(audio_data, bytes):
pass
else:
raise TypeError(f"audio_data must be bytes, numpy.ndarray or str, \
but got {type(audio_data)}")
if return_type == None:
return audio_data
return self.write_wave(None, [audio_data], return_type)
def write_wave(self, filename, frames, return_type='io'):
"""Write audio data to a file."""
if isinstance(frames, bytes):
frames = [frames]
if not isinstance(frames, list):
raise TypeError("frames should be \
a list of bytes or a bytes object, \
but got {}.".format(type(frames)))
if return_type == 'io':
if filename is None:
filename = io.BytesIO()
if self.filename:
filename = self.filename
return self.write_wave_io(filename, frames)
elif return_type == 'bytes':
return self.write_wave_bytes(frames)
def write_wave_io(self, filename, frames):
"""
Write audio data to a file-like object.
Args:
filename: [string or file-like object], file path or file-like object to write
frames: list of bytes, audio data to write
"""
wf = wave.open(filename, 'wb')
# 设置WAV文件的参数
wf.setnchannels(self.CHANNELS)
wf.setsampwidth(self.p.get_sample_size(self.FORMAT))
wf.setframerate(self.RATE)
wf.writeframes(b''.join(frames))
wf.close()
if isinstance(filename, io.BytesIO):
filename.seek(0) # reset file pointer to beginning
return filename
def write_wave_bytes(self, frames):
"""Write audio data to a bytes object."""
return b''.join(frames)
# ####################################################### #
# play audio data from Speaker
# ####################################################### #
class AudioPlayer(BaseAudio):
def __init__(self,
RATE=22050,
**kwargs):
super().__init__(output=True, RATE=RATE, **kwargs)
def play(self, audio_data):
# print("Playing audio data...")
audio_data = self.check_audio_type(audio_data, return_type=None)
for i in range(0, len(audio_data), self.CHUNK):
self.stream.write(audio_data[i:i+self.CHUNK])
print("Playing audio data...{}/{}".format(i, len(audio_data)))
self.stream.write(audio_data[i+self.CHUNK:])
# print("Audio data played.")
def close(self):
self.stream.stop_stream()
self.stream.close()
self.p.terminate()
# ####################################################### #
# record audio data from microphone
# ####################################################### #
class BaseRecorder(BaseAudio):
def __init__(self,
input=True,
base_chunk_size=None,
RATE=16000,
**kwargs):
super().__init__(input=input, RATE=RATE, **kwargs)
self.base_chunk_size = base_chunk_size
if base_chunk_size is None:
self.base_chunk_size = self.CHUNK
def record(self,
filename,
duration=5,
return_type='io',
logger=None):
if logger is not None:
logger.info("Recording started.")
else:
print("Recording started.")
frames = []
for i in range(0, int(self.RATE / self.CHUNK * duration)):
data = self.stream.read(self.CHUNK, exception_on_overflow=False)
frames.append(data)
if logger is not None:
logger.info("Recording stopped.")
else:
print("Recording stopped.")
return self.write_wave(filename, frames, return_type)
def record_chunk_voice(self,
return_type='bytes',
CHUNK=None,
exception_on_overflow=True,
queue=None):
data = self.stream.read(self.CHUNK if CHUNK is None else CHUNK,
exception_on_overflow=exception_on_overflow)
if return_type is not None:
return self.write_wave(None, [data], return_type)
return data
class HDRecorder(BaseRecorder):
def __init__(self,
board=None,
hd_trigger='keyboard',
keyboard_key='space',
voice_trigger=True,
hd_chunk_size=None,
hd_detect_threshold=50,
**kwargs):
super().__init__(**kwargs)
assert hd_trigger in ['keyboard', 'button']
self.hd_trigger = hd_trigger
self.voice_trigger = voice_trigger
self.hd_chunk_size = hd_chunk_size
if hd_chunk_size is None:
self.hd_chunk_size = self.base_chunk_size
if board == None:
assert hd_trigger == 'keyboard', "board should be `None` if hd_trigger is `keyboard`."
self.keyboard_key = keyboard_key
self.hardware = Keyboard(hd_trigger, keyboard_key, hd_detect_threshold)
else:
assert hd_trigger == 'button', f"hd_trigger should be `button` if board is `v329` or `orangepi`, but got `{hd_trigger}`."
if board == 'v329':
self.hardware = V329(hd_trigger, hd_detect_threshold)
elif board == 'orangepi':
self.hardware = OrangePi(hd_trigger, hd_detect_threshold)
print(f"Using {hd_trigger} as hardware trigger.")
def wait_for_hardware_pressed(self):
return self.hardware.wait_for_hardware_pressed()
@property
def is_hardware_pressed(self):
return self.hardware.is_hardware_pressed
def record_hardware(self, return_type='bytes'):
"""record audio when hardware trigger"""
print("Recording started for hardware trigger.")
frames = []
self.wait_for_hardware_pressed()
while True:
if self.hd_trigger == 'keyboard':
if keyboard.is_pressed(self.keyboard_key):
print("recording...")
data = self.record_chunk_voice(
CHUNK=self.CHUNK,
return_type=None,
exception_on_overflow=False)
frames.append(data)
else:
break
print("Recording stopped.")
elif self.hd_trigger == 'button':
if self.get_button_status():
data = self.stream.read(self.CHUNK)
frames.append(data)
else:
break
else:
recording = False
raise ValueError("hd_trigger should be 'keyboard' or 'button'.")
return self.write_wave(self.filename, frames, return_type)
'''
def record(self, return_type='bytes', queue=None):
if self.hd_trigger == 'all':
value_list = [] # 用于记录value的状态
if keyboard.is_pressed(self.keyboard_key):
audio_data = self.record_keyboard(return_type, queue)
elif self.button.get_value() == 0:
if self.get_button_status():
audio_data = self.record_button(return_type, queue)
else:
audio_data = self.record_voice(return_type, queue)
elif self.hd_trigger == 'keyboard':
print("Press SPACE to start recording.")
keyboard.wait("space")
audio_data = self.record_keyboard(return_type, queue)
elif self.hd_trigger == 'button':
print("Touch to start recording...")
if self.button.get_value() == 0:
if self.get_button_status():
audio_data = self.record_button(return_type, queue)
else:
audio_data = self.record_voice(return_type, queue)
return audio_data
def record_keyboard(self, return_type='bytes', queue=None):
"""record audio when keyboard pressing"""
print("Recording started.")
frames = []
recording = True
while recording:
if keyboard.is_pressed(self.keyboard_key):
data = self.stream.read(self.CHUNK)
frames.append(data)
else:
recording = False
print("Recording stopped.")
return self.write_wave(self.filename, frames, return_type)
def record_button(self, return_type='bytes', queue=None):
"""record audio when button pressing"""
print("Recording started.")
frames = []
recording = True
while recording:
value = self.button.get_value()
if value == 0:
data = self.stream.read(CHUNK)
frames.append(data)
else:
recording = False
print("Recording stopped.")
return self.write_wave(self.filename, frames, return_type)
'''
# ####################################################### #
# record audio data from microphone with VAD
# ####################################################### #
try:
import webrtcvad
webrtcvad_available = True
except:
warnings.warn("webrtcvad module not found, please install it if use `vad` hd_trigger.")
webrtcvad_available = False
class VADRecorder(HDRecorder):
def __init__(self, vad_sensitivity=1, frame_duration=30, vad_buffer_size=7, min_act_time=1,**kwargs):
super().__init__(**kwargs)
if webrtcvad_available:
self.vad = webrtcvad.Vad(vad_sensitivity)
self.vad_buffer_size = vad_buffer_size
self.vad_chunk_size = int(self.RATE * frame_duration / 1000)
self.min_act_time = min_act_time # 最小活动时间,单位秒
self.is_currently_speaking = False
self.frames = []
def is_speech(self, data):
return self.vad.is_speech(data, self.RATE)
def vad_filter(self, data):
pass
def vad_record(self, return_type='io', CHUNK=None, queue=None, save_file=False):
"""录音并进行语音活动检测人声并返回分割后的音频数据"""
all_frames = []
buffer_size = self.vad_buffer_size
active_buffer = deque([False for i in range(buffer_size)], maxlen=buffer_size)
audio_buffer = deque(maxlen=buffer_size)
silence_buffer = deque([True for i in range(buffer_size)], maxlen=buffer_size)
print("vad_recorded_audio VAD started. Press Ctrl+C to stop.")
try:
while True:
data = self.stream.read(self.vad_chunk_size)
all_frames.append(data)
print(f"VAD processing..., is_speech: {self.is_speech(data)}")
if self.is_speech(data):
# 标志位buffer
active_buffer.append(True); active_buffer.popleft()
silence_buffer.append(False); silence_buffer.popleft()
# 暂时增加到buffer中
audio_buffer.append(data)
# 如果满足检测要求
if all(active_buffer):
if not self.is_currently_speaking:
print("Speech start detected")
self.is_currently_speaking = True
self.frames.extend(audio_buffer) # 把说话的buffer也加上
if self.is_currently_speaking:
self.frames.append(data)
else:
# 标志位buffer
active_buffer.append(False); active_buffer.popleft()
silence_buffer.append(True); silence_buffer.popleft()
# 检测到人声并持续录音
if self.is_currently_speaking:
# 结束标志位
if all(silence_buffer):
print("Speech end detected")
break
except KeyboardInterrupt:
print("KeyboardInterrupt")
finally:
print("Stopping...")
if len(all_frames) > 0:
print(f"ALL frame: {len(all_frames)}")
print(f"ASR frame: {len(self.frames)}")
if save_file:
self.write_wave(f"output_{time.time()}_all.wav", all_frames)
self.write_wave(f"output_{time.time()}.wav", self.frames)
return self.write_wave(None, self.frames, return_type='bytes')
# ####################################################### #
# record audio data from microphone with PicoVoice hot words detection
# ####################################################### #
import struct
from datetime import datetime
import pvporcupine
class PicovoiceRecorder(VADRecorder):
def __init__(self,
access_key,
keywords=None,
keyword_paths=None,
model_path=None,
sensitivities=0.5,
library_path=None,
**kwargs):
super().__init__(**kwargs)
pico_cfg = dict(
access_key=access_key,
keywords=keywords,
keyword_paths=keyword_paths,
model_path=model_path,
sensitivities=sensitivities,
library_path=library_path,
)
self.pico_detector_init(pico_cfg)
self.keywords = self.pico_cfg['keywords']
print(f"PicovoiceRecorder initialized with keywords: {self.keywords}")
def pico_detector_init(self, pico_cfg):
if pico_cfg['keyword_paths'] is None:
if pico_cfg['keywords'] is None:
raise ValueError(f"Either `--keywords` or `--keyword_paths` must be set. \
Available keywords: {list(pvporcupine.KEYWORDS)}")
keyword_paths = [pvporcupine.KEYWORD_PATHS[x] for x in pico_cfg['keywords']]
else:
keyword_paths = pico_cfg['keyword_paths']
if pico_cfg['sensitivities'] is None:
pico_cfg['sensitivities'] = [0.5] * len(keyword_paths)
elif isinstance(pico_cfg['sensitivities'], float):
pico_cfg['sensitivities'] = [pico_cfg['sensitivities']] * len(keyword_paths)
if len(keyword_paths) != len(pico_cfg['sensitivities']):
raise ValueError('Number of keywords does not match the number of sensitivities.')
try:
self.porcupine = pvporcupine.create(
access_key=pico_cfg['access_key'],
keywords=pico_cfg['keywords'],
keyword_paths=keyword_paths,
model_path=pico_cfg['model_path'],
sensitivities=pico_cfg['sensitivities'],
library_path=pico_cfg['library_path'])
except pvporcupine.PorcupineInvalidArgumentError as e:
print("One or more arguments provided to Porcupine is invalid: ", pico_cfg.keys())
print(e)
raise e
except pvporcupine.PorcupineActivationError as e:
print("AccessKey activation error")
raise e
except pvporcupine.PorcupineActivationLimitError as e:
print("AccessKey '%s' has reached it's temporary device limit" % pico_cfg['access_key'])
raise e
except pvporcupine.PorcupineActivationRefusedError as e:
print("AccessKey '%s' refused" % pico_cfg['access_key'])
raise e
except pvporcupine.PorcupineActivationThrottledError as e:
print("AccessKey '%s' has been throttled" % pico_cfg['access_key'])
raise e
except pvporcupine.PorcupineError as e:
print("Failed to initialize Porcupine")
raise e
self.pico_cfg = pico_cfg
def is_wakeup(self, data):
pcm = struct.unpack_from("h" * self.porcupine.frame_length, data)
result = self.porcupine.process(pcm)
# print(f"picovoice result: {result}")
if result >= 0:
print('[%s] Detected %s' % (str(datetime.now()), self.keywords[result]))
return True
# self.write_wave(f"output_{time.time()}.wav", [data])
# print(f"write to: output_{time.time()}.wav")
return False
def record_picovoice(self, return_type=None, exception_on_overflow=False, queue=None):
print("Recording started. Press Ctrl+C to stop.")
while True:
data = self.record_chunk_voice(
return_type=None,
CHUNK=self.porcupine.frame_length,
exception_on_overflow=exception_on_overflow,
queue=queue)
wake_up = self.is_wakeup(data)
if wake_up:
break
return True

4
takway/board/__init__.py Normal file
View File

@ -0,0 +1,4 @@
from .base_hd import BaseHardware
from .keyboard import Keyboard
from .sipeed import V329
from .orangepi import OrangePi

32
takway/board/base_hd.py Normal file
View File

@ -0,0 +1,32 @@
import threading
import time
class BaseHardware:
def __init__(self, hd_trigger=None, hd_detect_threshold=50):
self.hd_trigger = hd_trigger
self.hd_detect_threshold = hd_detect_threshold
self.hd_lock = threading.Lock()
self.shared_hd_status = False
def init_hd_thread(self):
hd_thread = threading.Thread(target=self.hd_detection_loop)
hd_thread.start()
# hd_thread.join()
print("HD detection thread started.")
def hd_detection_loop(self):
pass
@property
def is_hardware_pressed(self):
return self.shared_hd_status
def wait_for_hardware_pressed(self):
print("Waiting for hardware trigger.")
while True:
if self.is_hardware_pressed:
time.sleep(0.01)
break
return True

32
takway/board/keyboard.py Normal file
View File

@ -0,0 +1,32 @@
import keyboard
import time
from takway.board.base_hd import BaseHardware
import datetime
t=0
last_status = False
class Keyboard(BaseHardware):
def __init__(self, hd_trigger='keyboard', keyboard_key='space', hd_detect_threshold=50):
super().__init__(hd_trigger, hd_detect_threshold)
self.keyboard_key = keyboard_key
self.init_hd_thread()
def hd_detection_loop(self):
keyboard_status = False
while True:
'''
keyboard_status = keyboard.is_pressed(self.keyboard_key)
with self.hd_lock:
self.shared_hd_status = keyboard_status
'''
self.shared_hd_status = keyboard.is_pressed(self.keyboard_key)
time.sleep(0.001)
global t, last_status
if t%2 == 0 and not self.shared_hd_status and last_status:
print(f"pres time: {datetime.datetime.now()}")
last_status = self.shared_hd_status
t+=1

90
takway/board/orangepi.py Normal file
View File

@ -0,0 +1,90 @@
from takway.board.base_hd import BaseHardware
import subprocess
import datetime
import threading
t=0
last_status = False
class OrangePi(BaseHardware):
def __init__(self, hd_trigger='button', hd_detect_threshold=50):
super().__init__(hd_trigger, hd_detect_threshold)
self.BUTTON_PIN_red = 6
self.LED_PIN_red = 2
self.BUTTON_PIN_blue = 8
self.LED_PIN_blue = 5
self.shared_hd_status_2 = False
self.led_set_status_2 = False
self.button_init()
self.init_hd_thread()
def button_init(self):
subprocess.run(["gpio", "mode", str(self.LED_PIN_red), "out"])
subprocess.run(["gpio", "mode", str(self.BUTTON_PIN_red), "in"])
subprocess.run(["gpio", "mode", str(self.LED_PIN_blue), "out"])
subprocess.run(["gpio", "mode", str(self.BUTTON_PIN_blue), "in"])
def init_hd_thread(self):
hd_threads = [threading.Thread(target=self.hd_detection_loop),
threading.Thread(target=self.hd_detection_loop_2)]
for hd_thread in hd_threads:
hd_thread.start()
@property
def button_status(self):
return self.shared_hd_status
def hd_detection_loop(self):
keyboard_status = False
while True:
self.shared_hd_status = True if subprocess.run(["gpio", "read", str(self.BUTTON_PIN_red)], capture_output=True, text=True).stdout.strip() == '0' else False
if self.shared_hd_status:
# 打开LED输出高电平
subprocess.run(["gpio", "write", str(self.LED_PIN_red), "1"])
else:
# 关闭LED输出低电平
subprocess.run(["gpio", "write", str(self.LED_PIN_red), "0"])
global t, last_status
if not self.shared_hd_status and last_status:
print(f"pres time: {datetime.datetime.now()}")
last_status = self.shared_hd_status
t+=1
@property
def button2_status(self):
return self.shared_hd_status_2
def hd_detection_loop_2(self):
keyboard_status = False
while True:
if self.led_set_status_2:
self.set_led2_on()
continue
self.shared_hd_status_2 = True if subprocess.run(["gpio", "read", str(self.BUTTON_PIN_blue)], capture_output=True, text=True).stdout.strip() == '0' else False
if self.shared_hd_status_2:
# 打开LED输出高电平
subprocess.run(["gpio", "write", str(self.LED_PIN_blue), "1"])
else:
# 关闭LED输出低电平
subprocess.run(["gpio", "write", str(self.LED_PIN_blue), "0"])
def set_led1_on(self):
subprocess.run(["gpio", "write", str(self.LED_PIN_red), "1"])
def set_led1_off(self):
subprocess.run(["gpio", "write", str(self.LED_PIN_red), "0"])
def set_led2_on(self):
self.led_set_status_2 = True
subprocess.run(["gpio", "write", str(self.LED_PIN_blue), "1"])
def set_led2_off(self):
self.led_set_status_2 = False
subprocess.run(["gpio", "write", str(self.LED_PIN_blue), "0"])

58
takway/board/sipeed.py Normal file
View File

@ -0,0 +1,58 @@
import sys
import warnings
import threading
import time
from collections import deque
from takway.board.base_hd import BaseHardware
if "gpiod" in sys.modules:
# sipeed MaixSense V329
import gpiod as gpio
else:
# 如果所有库都不存在,执行默认操作或抛出异常
# raise ImportError("gpiod package is not available.")
warnings.warn("gpiod package is not available.")
class V329(BaseHardware):
def __init__(self, hd_trigger='button', hd_detect_threshold=50):
super().__init__(hd_trigger, hd_detect_threshold)
self.button = self.button_init()
self.init_hd_thread()
def button_init(self):
PH_BASE = (8-1)*32 #PH
gpiochip1 = gpio.chip("gpiochip1")
button = gpiochip1.get_line((PH_BASE+5))
config = gpio.line_request()
config.request_type = gpio.line_request.DIRECTION_INPUT
config.flags = gpio.line_request.FLAG_BIAS_PULL_UP
button.request(config)
return button
@property
def button_status(self):
return True if self.button.get_value() == 1 else False
def hd_detection_loop(self):
self.shared_hd_status = False
button_value_list = deque(maxlen=self.hd_detect_threshold)
while True:
if len(button_value_list) > button_value_list.maxlen:
button_value_list.popleft()
button_value_list.append(self.button_status)
# 记录50个值如果连续50个值都是True则认为按钮被按下
if button_value_list.count(True) == button_value_list.maxlen:
with self.hd_lock:
self.shared_hd_status = True
# 记录50个值如果连续50个值都是False则认为按钮被松开
if button_value_list.count(False) == button_value_list.maxlen:
with self.hd_lock:
self.shared_hd_status = False

96
takway/common_utils.py Normal file
View File

@ -0,0 +1,96 @@
# ############################################################# #
# format table function
# ############################################################# #
def format_table(header, rows):
# 计算列宽
col_width = max(len(str(word)) for row in rows for word in row) + 2 # 最大单词长度 + 2 作为列宽
# 打印表头
print("".join(word.ljust(col_width) for word in header))
# 打印分隔线
print("".join("-" * col_width for _ in header))
# 打印内容
for row in rows:
print("".join(str(word).ljust(col_width) for word in row))
# ############################################################# #
# encode and decode bytes and string
# ############################################################# #
import base64
def encode_bytes2str(data):
# 将字节串编码为Base64
if data is None:
return None
return base64.b64encode(data).decode('utf-8')
def decode_str2bytes(data):
# 将Base64编码的字节串解码为字节串
if data is None:
return None
return base64.b64decode(data.encode('utf-8'))
import re
def split_sentences(text: str):
# 定义中文标点符号的正则表达式
pattern = r'[\\\\\\\\\\\\\》]+'
# 使用正则表达式分割字符串
sentences = re.split(pattern, text)
# 过滤掉空字符串
sentences = [sentence for sentence in sentences if sentence]
return sentences
'''
# 示例文本
text = "今天天气真好,我们去公园玩吧!你觉得怎么样?好的,那就这么定了。"
# 调用函数进行断句
sentences = split_sentences(text)
print(sentences)
'''
def split_chinese_text(text: str, return_patch=False):
# 定义中文标点符号集合
punctuations = set('。!?,;:、“”()《》【】')
# 初始化断句结果列表和标点符号列表
sentences = []
punctuation_list = []
text_patch = []
start = 0 # 断句开始位置
for i, char in enumerate(text):
if char in punctuations:
# 如果当前字符是标点符号,则进行断句,并记录标点符号
sentences.append(text[start:i+1])
punctuation_list.append(char)
start = i + 1 # 更新断句开始位置
# 处理最后一句(如果最后一句后没有标点符号)
if start < len(text):
sentences.append(text[start:])
if return_patch:
if len(punctuation_list) == 0:
return [text], False # 有残留语句
elif len(sentences) == len(punctuation_list):
return [''.join(sentences)], True
else:
return [''.join(sentences[:-1]), sentences[-1]], True
return sentences, punctuation_list
'''
# 示例文本
text = "你好,世界!今天天气怎么样?希望你有一个美好的一天。"
sentences, punctuation_list = split_chinese_text(text)
print("断句结果:", sentences)
print("标点符号列表:", punctuation_list)
'''
def remove_brackets_and_contents(text):
# 使用sub函数替换匹配的文本为空字符串
result = re.sub(r'\(.*?\)', '', text)
result = re.sub(r'\.*?\', '', result)
result = re.sub(r'\【.*?\', '', result)
return result

Binary file not shown.

View File

@ -10,6 +10,8 @@ class ModifiedRecognizer(FunAutoSpeechRecognizer):
use_punct=True,
use_emotion=False,
use_speaker_ver=True):
# 创建基础的 funasr模型用于语音识别识别出不带标点的句子
super().__init__(
model_path="paraformer-zh-streaming",
device="cuda",
@ -19,18 +21,28 @@ class ModifiedRecognizer(FunAutoSpeechRecognizer):
chunk_ms=480,
encoder_chunk_look_back=4,
decoder_chunk_look_back=1)
# 记录是否具备附加功能
self.use_punct = use_punct
self.use_emotion = use_emotion
self.use_speaker_ver = use_speaker_ver
# 增加标点模型
if use_punct:
self.puctuation_model = Punctuation(**CTTRANSFORMER)
# 情绪识别模型
if use_emotion:
self.emotion_model = Emotion(**FUNASRFINETUNE)
# 说话人识别模型
if use_speaker_ver:
self.speaker_ver_model = speaker_verfication(**ERES2NETV2)
def initialize_speaker(self, speaker_1_wav):
"""
用于说话人识别将输入的音频(speaker_1_wav)设立为目标说话人并将其特征保存本地
"""
if not self.use_speaker_ver:
raise NotImplementedError("no access")
if speaker_1_wav.endswith(".npy"):
@ -45,35 +57,52 @@ class ModifiedRecognizer(FunAutoSpeechRecognizer):
def speaker_ver(self, speaker_2_wav):
"""
用于说话人识别判断输入音频是否为目标说话人
是返回True不是返回False
"""
if not self.use_speaker_ver:
raise NotImplementedError("no access")
if not hasattr(self, "save_speaker_path"):
raise NotImplementedError("please initialize speaker first")
# pdb.set_trace()
# self.speaker_ver_model.verfication 返回值为字符串 'yes' / 'no'
return self.speaker_ver_model.verfication(base_emb=self.save_speaker_path,
speaker_2_wav=speaker_2_wav) == 'yes'
def recognize(self, audio_data):
"""
非流式语音识别返回识别出的文本返回值类型 str
"""
audio_data = self.check_audio_type(audio_data)
# 说话人识别
if self.use_speaker_ver:
if self.speaker_ver_model.verfication(self.save_speaker_path,
speaker_2_wav=audio_data) == 'no':
return "Other People"
# 语音识别
result = self.asr_model.generate(input=audio_data,
batch_size_s=300,
hotword=self.hotwords)
text = ''
for res in result:
text += res['text']
# 添加标点
if self.use_punct:
text = self.puctuation_model.process(text+'#', append_period=False).replace('#', '')
return text
def recognize_emotion(self, audio_data):
"""
情感识别返回值为:
1. 如果说话人非目标说话人返回字符串 "Other People"
2. 如果说话人为目标说话人返回字典{"Labels": List[str], "scores": List[int]}
"""
audio_data = self.check_audio_type(audio_data)
if self.use_speaker_ver:
@ -93,14 +122,20 @@ class ModifiedRecognizer(FunAutoSpeechRecognizer):
audio_data: bytes or numpy array, partial audio data
is_end: bool, whether the audio data is the end of a sentence
auto_det_end: bool, whether to automatically detect the end of a audio data
流式语音识别返回值为
1. 如果说话人非目标说话人返回字符串 "Other People"
2. 如果说话人为目标说话人返回字典{"test": List[str], "is_end": boolean}
"""
audio_data = self.check_audio_type(audio_data)
# 说话人识别
if self.use_speaker_ver:
if self.speaker_ver_model.verfication(self.save_speaker_path,
speaker_2_wav=audio_data) == 'no':
return "Other People"
# 语音识别
text_dict = dict(text=[], is_end=is_end)
if self.audio_cache is None:
@ -145,6 +180,8 @@ class ModifiedRecognizer(FunAutoSpeechRecognizer):
except ValueError as e:
print(f"ValueError: {e}")
continue
# 增添标点
if self.use_punct:
text_dict['text'].append(self.puctuation_model.process(self.text_postprecess(res[0], data_id='text'), cache=text_dict))
else:

View File

@ -10,7 +10,7 @@ ERES2NETV2 = {
}
# 保存 embedding 的路径
DEFALUT_SAVE_PATH = r"D:\python\irving\takway_base-main\examples"
DEFALUT_SAVE_PATH = r".\takway\savePath"
class speaker_verfication:
def __init__(self,

View File

@ -1,120 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ####################################################### #
# VOSKAutoSpeechRecognizer
# ####################################################### #
import json
import wave
import io
import os
from vosk import Model, KaldiRecognizer, SetLogLevel
from .base_stt import STTBase
from ..common_utils import decode_str2bytes
class VOSKAutoSpeechRecognizer(STTBase):
def __init__(self, model_path="vosk-model-small-cn-0.22", RATE=16000, cfg_path=None, efficent_mode=True, debug=False):
super().__init__(self, model_path=model_path, RATE=RATE, cfg_path=cfg_path, debug=debug)
self.asr_model = AutoModel(model="paraformer-zh-streaming")
self.apply_asr_config(self.asr_cfg)
def recognize_keywords(self, audio_data, partial_size=None, queue=None):
"""recognize keywords in audio data"""
audio_data = self.check_audio_type(audio_data)
if partial_size is None:
rec_result = self.recognize(audio_data, queue)
rec_text = self.result_postprecess(rec_result)
else:
rec_result = self.partial_recognize(audio_data, partial_size, queue)
rec_text = self.result_postprecess(rec_result, 'partial')
print(f"rec_text: {rec_text}")
if rec_text != '':
print(f"rec_text: {rec_text}")
if any(keyword in rec_text for keyword in self.keywords):
print("Keyword detected.")
return True, rec_text
else:
return False, None
def recognize(self, audio_data, queue=None):
"""recognize audio data to text"""
audio_data = self.check_audio_type(audio_data)
self.asr.AcceptWaveform(audio_data)
result = json.loads(self.asr.FinalResult())
# TODO: put result to queue
return result
def partial_recognize(self, audio_data, partial_size=1024, queue=None):
"""recognize partial result"""
audio_data = self.check_audio_type(audio_data)
text_dict = dict(
text=[],
partial=[],
final=[],
is_end=False)
# 逐个分割音频数据进行识别
for i in range(0, len(audio_data), partial_size):
# print(f"partial data: {i} - {i+partial_size}")
data = audio_data[i:i+partial_size]
if len(data) == 0:
break
if self.asr.AcceptWaveform(data):
result = json.loads(self.asr.Result())
if result['text'] != '':
text_dict['text'].append(result['text'])
if queue is not None:
queue.put(('stt_info', text_dict))
# print(f"text result: {result}")
else:
result = json.loads(self.asr.PartialResult())
if result['partial'] != '':
# text_dict['partial'].append(result['partial'])
text_dict['partial'] = [result['partial']]
if queue is not None:
queue.put(('stt_info', text_dict))
# print(f"partial result: {result}")
# final recognize
final_result = json.loads(self.asr.FinalResult())
if final_result['text'] != '':
text_dict['final'].append(final_result['text'])
text_dict['text'].append(final_result['text'])
text_dict['is_end'] = True
print(f"final dict: {text_dict}")
if queue is not None:
queue.put(('stt_info', text_dict))
return text_dict
if __name__ == "__main__":
'''
wav_file_path = "recording.wav"
# You can set log level to -1 to disable debug messages
SetLogLevel(0)
model = Model(model_path="vosk-model-small-cn-0.22")
# 调用函数进行录音
# record_audio(wav_file_path)
data = record_audio()
# 调用函数进行音频转写
result = audio_to_text(data, model)
print("-------------")
print(result)
'''
from takway.audio_utils import Recorder
rec = Recorder()
return_type = 'bytes'
data = rec.record(return_type)
print(type(data))
asr = AutoSpeechRecognizer()
# asr.recognize(data)
asr.add_keyword("你好")
asr.recognize_keywords(data)