diff --git a/README.md b/README.md new file mode 100644 index 0000000..e69de29 diff --git a/demo.py b/demo.py new file mode 100644 index 0000000..71710b5 --- /dev/null +++ b/demo.py @@ -0,0 +1,29 @@ +from takway.audio_utils import BaseRecorder +from takway.stt.funasr_utils import FunAutoSpeechRecognizer +from takway.stt.modified_funasr import ModifiedRecognizer +def asr_file_stream(file_path=r'.\examples\example_recording.wav'): + rec = BaseRecorder() + + data = rec.load_audio_file(file_path) + + asr = ModifiedRecognizer(use_punct=True, use_emotion=True, use_speaker_ver=True) + + asr.initialize_speaker(r".\examples\example_recording.wav") + + text_dict = asr.streaming_recognize(data, auto_det_end=True) + + print("===============================================") + print(f"text_dict: {text_dict}") + + if not isinstance(text_dict, str): + print("".join(text_dict['text'])) + + print("===============================================") + emotion_dict = asr.recognize_emotion(data) + print(f"emotion_dict: {emotion_dict}") + if not isinstance(emotion_dict, str): + max_index = emotion_dict['scores'].index(max(emotion_dict['scores'])) + print("emotion: " +emotion_dict['labels'][max_index]) + + +asr_file_stream() \ No newline at end of file diff --git a/examples/example_recording.wav b/examples/example_recording.wav new file mode 100644 index 0000000..2f16668 Binary files /dev/null and b/examples/example_recording.wav differ diff --git a/takway/audio_utils.py b/takway/audio_utils.py new file mode 100644 index 0000000..cf404b7 --- /dev/null +++ b/takway/audio_utils.py @@ -0,0 +1,578 @@ +import io +import os +import time +import pyaudio +import wave +import json +import warnings +import threading +import numpy as np +from collections import deque + +from .common_utils import encode_bytes2str, decode_str2bytes + +from takway.board import * +try: + import keyboard +except: + pass + +def play_audio(audio_data, type='base64'): + ''' + 读取base64编码的音频流并播放 + ''' + # PyAudio配置 + p = pyaudio.PyAudio() + stream = p.open(format=pyaudio.paInt16, channels=1, rate=22050, output=True) + + # 播放音频 + stream.write(audio_data) + stream.stop_stream() + stream.close() + p.terminate() + +''' +import librosa +def reshape_sample_rate(audio, sr_original=None, sr_target=16000): + # 获取原始采样率和音频数据 + if isinstance(audio, tuple): + sr_original, audio_data = audio + elif isinstance(audio, bytes): + audio_data = np.frombuffer(audio, dtype=np.int16) + assert sr_original is not None, f"sr_original should be provided if audio is a \ + numpy.ndarray, but got sr_original `{sr_original}`." + + if isinstance(audio_data, np.ndarray): + if audio_data.dtype == np.dtype('int16'): + audio_data = audio_data.astype(np.float32) / np.iinfo(np.int16).max + assert audio_data.dtype == np.dtype('float32'), f"audio_data should be float32, \ + but got {audio_data.dtype}." + else: + raise TypeError(f"audio_data should be numpy.ndarray, but got {type(audio_data)}.") + + # 重新采样音频数据 + audio_data_resampled = librosa.resample(audio_data, orig_sr=sr_original, target_sr=sr_target) + + if audio_data_resampled.dtype == np.dtype('float32'): + audio_data_resampled = np.int16(audio_data_resampled * np.iinfo(np.int16).max) + + # If the input was bytes, return the resampled data as bytes + if isinstance(audio, bytes): + audio_data_resampled = audio_data_resampled.tobytes() + + return audio_data_resampled + +# Example usage: +# If your audio data is in bytes: +# audio_bytes = b'...' # Your audio data as bytes +# audio_data_resampled = reshape_sample_rate(audio_bytes) + +# If your audio data is in numpy int16: +# audio_int16 = np.array([...], dtype=np.int16) # Your audio data as numpy int16 +# audio_data_resampled = reshape_sample_rate(audio_int16) +''' + + + +# ####################################################### # +# base audio class +# ####################################################### # + +class BaseAudio: + def __init__(self, + filename=None, + input=False, + output=False, + CHUNK=1024, + FORMAT=pyaudio.paInt16, + CHANNELS=1, + RATE=16000, + input_device_index=None, + output_device_index=None, + **kwargs): + self.CHUNK = CHUNK + self.FORMAT = FORMAT + self.CHANNELS = CHANNELS + self.RATE = RATE + self.filename = filename + assert input!= output, "input and output cannot be the same, \ + but got input={} and output={}.".format(input, output) + print("------------------------------------------") + print(f"{'Input' if input else 'Output'} Audio Initialization: ") + print(f"CHUNK: {self.CHUNK} \nFORMAT: {self.FORMAT} \nCHANNELS: {self.CHANNELS} \nRATE: {self.RATE} \ninput_device_index: {input_device_index} \noutput_device_index: {output_device_index}") + print("------------------------------------------") + self.p = pyaudio.PyAudio() + self.stream = self.p.open(format=FORMAT, + channels=CHANNELS, + rate=RATE, + input=input, + output=output, + input_device_index=input_device_index, + output_device_index=output_device_index, + **kwargs) + + def load_audio_file(self, wav_file): + with wave.open(wav_file, 'rb') as wf: + params = wf.getparams() + frames = wf.readframes(params.nframes) + print("Audio file loaded.") + # Audio Parameters + # print("Channels:", params.nchannels) + # print("Sample width:", params.sampwidth) + # print("Frame rate:", params.framerate) + # print("Number of frames:", params.nframes) + # print("Compression type:", params.comptype) + return frames + + def check_audio_type(self, audio_data, return_type=None): + assert return_type in ['bytes', 'io', None], \ + "return_type should be 'bytes', 'io' or None." + if isinstance(audio_data, str): + if len(audio_data) > 50: + audio_data = decode_str2bytes(audio_data) + else: + assert os.path.isfile(audio_data), \ + "audio_data should be a file path or a bytes object." + wf = wave.open(audio_data, 'rb') + audio_data = wf.readframes(wf.getnframes()) + elif isinstance(audio_data, np.ndarray): + if audio_data.dtype == np.dtype('float32'): + audio_data = np.int16(audio_data * np.iinfo(np.int16).max) + audio_data = audio_data.tobytes() + elif isinstance(audio_data, bytes): + pass + else: + raise TypeError(f"audio_data must be bytes, numpy.ndarray or str, \ + but got {type(audio_data)}") + + if return_type == None: + return audio_data + return self.write_wave(None, [audio_data], return_type) + + def write_wave(self, filename, frames, return_type='io'): + """Write audio data to a file.""" + if isinstance(frames, bytes): + frames = [frames] + if not isinstance(frames, list): + raise TypeError("frames should be \ + a list of bytes or a bytes object, \ + but got {}.".format(type(frames))) + + if return_type == 'io': + if filename is None: + filename = io.BytesIO() + if self.filename: + filename = self.filename + return self.write_wave_io(filename, frames) + elif return_type == 'bytes': + return self.write_wave_bytes(frames) + + + def write_wave_io(self, filename, frames): + """ + Write audio data to a file-like object. + + Args: + filename: [string or file-like object], file path or file-like object to write + frames: list of bytes, audio data to write + """ + wf = wave.open(filename, 'wb') + + # 设置WAV文件的参数 + wf.setnchannels(self.CHANNELS) + wf.setsampwidth(self.p.get_sample_size(self.FORMAT)) + wf.setframerate(self.RATE) + wf.writeframes(b''.join(frames)) + wf.close() + if isinstance(filename, io.BytesIO): + filename.seek(0) # reset file pointer to beginning + return filename + + def write_wave_bytes(self, frames): + """Write audio data to a bytes object.""" + return b''.join(frames) + + +# ####################################################### # +# play audio data from Speaker +# ####################################################### # + +class AudioPlayer(BaseAudio): + def __init__(self, + RATE=22050, + **kwargs): + super().__init__(output=True, RATE=RATE, **kwargs) + + def play(self, audio_data): + # print("Playing audio data...") + audio_data = self.check_audio_type(audio_data, return_type=None) + + for i in range(0, len(audio_data), self.CHUNK): + self.stream.write(audio_data[i:i+self.CHUNK]) + print("Playing audio data...{}/{}".format(i, len(audio_data))) + self.stream.write(audio_data[i+self.CHUNK:]) + # print("Audio data played.") + + + def close(self): + self.stream.stop_stream() + self.stream.close() + self.p.terminate() + +# ####################################################### # +# record audio data from microphone +# ####################################################### # +class BaseRecorder(BaseAudio): + def __init__(self, + input=True, + base_chunk_size=None, + RATE=16000, + **kwargs): + super().__init__(input=input, RATE=RATE, **kwargs) + self.base_chunk_size = base_chunk_size + if base_chunk_size is None: + self.base_chunk_size = self.CHUNK + + def record(self, + filename, + duration=5, + return_type='io', + logger=None): + if logger is not None: + logger.info("Recording started.") + else: + print("Recording started.") + frames = [] + for i in range(0, int(self.RATE / self.CHUNK * duration)): + data = self.stream.read(self.CHUNK, exception_on_overflow=False) + frames.append(data) + if logger is not None: + logger.info("Recording stopped.") + else: + print("Recording stopped.") + return self.write_wave(filename, frames, return_type) + + def record_chunk_voice(self, + return_type='bytes', + CHUNK=None, + exception_on_overflow=True, + queue=None): + data = self.stream.read(self.CHUNK if CHUNK is None else CHUNK, + exception_on_overflow=exception_on_overflow) + if return_type is not None: + return self.write_wave(None, [data], return_type) + return data + + +class HDRecorder(BaseRecorder): + def __init__(self, + board=None, + hd_trigger='keyboard', + keyboard_key='space', + voice_trigger=True, + hd_chunk_size=None, + hd_detect_threshold=50, + **kwargs): + super().__init__(**kwargs) + assert hd_trigger in ['keyboard', 'button'] + + self.hd_trigger = hd_trigger + self.voice_trigger = voice_trigger + + self.hd_chunk_size = hd_chunk_size + if hd_chunk_size is None: + self.hd_chunk_size = self.base_chunk_size + + if board == None: + assert hd_trigger == 'keyboard', "board should be `None` if hd_trigger is `keyboard`." + self.keyboard_key = keyboard_key + self.hardware = Keyboard(hd_trigger, keyboard_key, hd_detect_threshold) + else: + assert hd_trigger == 'button', f"hd_trigger should be `button` if board is `v329` or `orangepi`, but got `{hd_trigger}`." + if board == 'v329': + self.hardware = V329(hd_trigger, hd_detect_threshold) + elif board == 'orangepi': + self.hardware = OrangePi(hd_trigger, hd_detect_threshold) + print(f"Using {hd_trigger} as hardware trigger.") + + def wait_for_hardware_pressed(self): + return self.hardware.wait_for_hardware_pressed() + + @property + def is_hardware_pressed(self): + return self.hardware.is_hardware_pressed + + def record_hardware(self, return_type='bytes'): + """record audio when hardware trigger""" + print("Recording started for hardware trigger.") + frames = [] + self.wait_for_hardware_pressed() + while True: + if self.hd_trigger == 'keyboard': + if keyboard.is_pressed(self.keyboard_key): + print("recording...") + data = self.record_chunk_voice( + CHUNK=self.CHUNK, + return_type=None, + exception_on_overflow=False) + frames.append(data) + else: + break + print("Recording stopped.") + elif self.hd_trigger == 'button': + if self.get_button_status(): + data = self.stream.read(self.CHUNK) + frames.append(data) + else: + break + else: + recording = False + raise ValueError("hd_trigger should be 'keyboard' or 'button'.") + return self.write_wave(self.filename, frames, return_type) + + ''' + def record(self, return_type='bytes', queue=None): + if self.hd_trigger == 'all': + value_list = [] # 用于记录value的状态 + if keyboard.is_pressed(self.keyboard_key): + audio_data = self.record_keyboard(return_type, queue) + elif self.button.get_value() == 0: + if self.get_button_status(): + audio_data = self.record_button(return_type, queue) + else: + audio_data = self.record_voice(return_type, queue) + elif self.hd_trigger == 'keyboard': + print("Press SPACE to start recording.") + keyboard.wait("space") + audio_data = self.record_keyboard(return_type, queue) + elif self.hd_trigger == 'button': + print("Touch to start recording...") + if self.button.get_value() == 0: + if self.get_button_status(): + audio_data = self.record_button(return_type, queue) + else: + audio_data = self.record_voice(return_type, queue) + + return audio_data + + def record_keyboard(self, return_type='bytes', queue=None): + """record audio when keyboard pressing""" + print("Recording started.") + frames = [] + recording = True + while recording: + if keyboard.is_pressed(self.keyboard_key): + data = self.stream.read(self.CHUNK) + frames.append(data) + else: + recording = False + print("Recording stopped.") + return self.write_wave(self.filename, frames, return_type) + + def record_button(self, return_type='bytes', queue=None): + """record audio when button pressing""" + print("Recording started.") + frames = [] + recording = True + while recording: + value = self.button.get_value() + if value == 0: + data = self.stream.read(CHUNK) + frames.append(data) + else: + recording = False + print("Recording stopped.") + return self.write_wave(self.filename, frames, return_type) + ''' + +# ####################################################### # +# record audio data from microphone with VAD +# ####################################################### # +try: + import webrtcvad + webrtcvad_available = True +except: + warnings.warn("webrtcvad module not found, please install it if use `vad` hd_trigger.") + webrtcvad_available = False + +class VADRecorder(HDRecorder): + def __init__(self, vad_sensitivity=1, frame_duration=30, vad_buffer_size=7, min_act_time=1,**kwargs): + super().__init__(**kwargs) + if webrtcvad_available: + self.vad = webrtcvad.Vad(vad_sensitivity) + self.vad_buffer_size = vad_buffer_size + self.vad_chunk_size = int(self.RATE * frame_duration / 1000) + + self.min_act_time = min_act_time # 最小活动时间,单位秒 + + self.is_currently_speaking = False + self.frames = [] + + def is_speech(self, data): + return self.vad.is_speech(data, self.RATE) + + def vad_filter(self, data): + pass + + + def vad_record(self, return_type='io', CHUNK=None, queue=None, save_file=False): + """录音并进行语音活动检测人声并返回分割后的音频数据""" + all_frames = [] + + buffer_size = self.vad_buffer_size + active_buffer = deque([False for i in range(buffer_size)], maxlen=buffer_size) + audio_buffer = deque(maxlen=buffer_size) + silence_buffer = deque([True for i in range(buffer_size)], maxlen=buffer_size) + + print("vad_recorded_audio VAD started. Press Ctrl+C to stop.") + try: + while True: + data = self.stream.read(self.vad_chunk_size) + all_frames.append(data) + print(f"VAD processing..., is_speech: {self.is_speech(data)}") + if self.is_speech(data): + # 标志位buffer + active_buffer.append(True); active_buffer.popleft() + silence_buffer.append(False); silence_buffer.popleft() + # 暂时增加到buffer中 + audio_buffer.append(data) + # 如果满足检测要求 + if all(active_buffer): + if not self.is_currently_speaking: + print("Speech start detected") + self.is_currently_speaking = True + self.frames.extend(audio_buffer) # 把说话的buffer也加上 + if self.is_currently_speaking: + self.frames.append(data) + else: + # 标志位buffer + active_buffer.append(False); active_buffer.popleft() + silence_buffer.append(True); silence_buffer.popleft() + # 检测到人声并持续录音 + if self.is_currently_speaking: + # 结束标志位 + if all(silence_buffer): + print("Speech end detected") + break + except KeyboardInterrupt: + print("KeyboardInterrupt") + + finally: + print("Stopping...") + if len(all_frames) > 0: + print(f"ALL frame: {len(all_frames)}") + print(f"ASR frame: {len(self.frames)}") + if save_file: + self.write_wave(f"output_{time.time()}_all.wav", all_frames) + self.write_wave(f"output_{time.time()}.wav", self.frames) + return self.write_wave(None, self.frames, return_type='bytes') + + +# ####################################################### # +# record audio data from microphone with PicoVoice hot words detection +# ####################################################### # + +import struct +from datetime import datetime +import pvporcupine + +class PicovoiceRecorder(VADRecorder): + def __init__(self, + access_key, + keywords=None, + keyword_paths=None, + model_path=None, + sensitivities=0.5, + library_path=None, + **kwargs): + + super().__init__(**kwargs) + + pico_cfg = dict( + access_key=access_key, + keywords=keywords, + keyword_paths=keyword_paths, + model_path=model_path, + sensitivities=sensitivities, + library_path=library_path, + ) + + self.pico_detector_init(pico_cfg) + + self.keywords = self.pico_cfg['keywords'] + print(f"PicovoiceRecorder initialized with keywords: {self.keywords}") + + def pico_detector_init(self, pico_cfg): + if pico_cfg['keyword_paths'] is None: + if pico_cfg['keywords'] is None: + raise ValueError(f"Either `--keywords` or `--keyword_paths` must be set. \ + Available keywords: {list(pvporcupine.KEYWORDS)}") + + keyword_paths = [pvporcupine.KEYWORD_PATHS[x] for x in pico_cfg['keywords']] + else: + keyword_paths = pico_cfg['keyword_paths'] + + if pico_cfg['sensitivities'] is None: + pico_cfg['sensitivities'] = [0.5] * len(keyword_paths) + elif isinstance(pico_cfg['sensitivities'], float): + pico_cfg['sensitivities'] = [pico_cfg['sensitivities']] * len(keyword_paths) + + if len(keyword_paths) != len(pico_cfg['sensitivities']): + raise ValueError('Number of keywords does not match the number of sensitivities.') + + try: + self.porcupine = pvporcupine.create( + access_key=pico_cfg['access_key'], + keywords=pico_cfg['keywords'], + keyword_paths=keyword_paths, + model_path=pico_cfg['model_path'], + sensitivities=pico_cfg['sensitivities'], + library_path=pico_cfg['library_path']) + except pvporcupine.PorcupineInvalidArgumentError as e: + print("One or more arguments provided to Porcupine is invalid: ", pico_cfg.keys()) + print(e) + raise e + except pvporcupine.PorcupineActivationError as e: + print("AccessKey activation error") + raise e + except pvporcupine.PorcupineActivationLimitError as e: + print("AccessKey '%s' has reached it's temporary device limit" % pico_cfg['access_key']) + raise e + except pvporcupine.PorcupineActivationRefusedError as e: + print("AccessKey '%s' refused" % pico_cfg['access_key']) + raise e + except pvporcupine.PorcupineActivationThrottledError as e: + print("AccessKey '%s' has been throttled" % pico_cfg['access_key']) + raise e + except pvporcupine.PorcupineError as e: + print("Failed to initialize Porcupine") + raise e + + self.pico_cfg = pico_cfg + + def is_wakeup(self, data): + pcm = struct.unpack_from("h" * self.porcupine.frame_length, data) + result = self.porcupine.process(pcm) + # print(f"picovoice result: {result}") + if result >= 0: + print('[%s] Detected %s' % (str(datetime.now()), self.keywords[result])) + return True + # self.write_wave(f"output_{time.time()}.wav", [data]) + # print(f"write to: output_{time.time()}.wav") + return False + + + def record_picovoice(self, return_type=None, exception_on_overflow=False, queue=None): + + print("Recording started. Press Ctrl+C to stop.") + while True: + data = self.record_chunk_voice( + return_type=None, + CHUNK=self.porcupine.frame_length, + exception_on_overflow=exception_on_overflow, + queue=queue) + + wake_up = self.is_wakeup(data) + if wake_up: + break + return True diff --git a/takway/board/__init__.py b/takway/board/__init__.py new file mode 100644 index 0000000..9f1a159 --- /dev/null +++ b/takway/board/__init__.py @@ -0,0 +1,4 @@ +from .base_hd import BaseHardware +from .keyboard import Keyboard +from .sipeed import V329 +from .orangepi import OrangePi \ No newline at end of file diff --git a/takway/board/base_hd.py b/takway/board/base_hd.py new file mode 100644 index 0000000..385b8b9 --- /dev/null +++ b/takway/board/base_hd.py @@ -0,0 +1,32 @@ +import threading +import time + +class BaseHardware: + def __init__(self, hd_trigger=None, hd_detect_threshold=50): + self.hd_trigger = hd_trigger + self.hd_detect_threshold = hd_detect_threshold + + self.hd_lock = threading.Lock() + self.shared_hd_status = False + + + def init_hd_thread(self): + hd_thread = threading.Thread(target=self.hd_detection_loop) + hd_thread.start() + # hd_thread.join() + print("HD detection thread started.") + + def hd_detection_loop(self): + pass + + @property + def is_hardware_pressed(self): + return self.shared_hd_status + + def wait_for_hardware_pressed(self): + print("Waiting for hardware trigger.") + while True: + if self.is_hardware_pressed: + time.sleep(0.01) + break + return True \ No newline at end of file diff --git a/takway/board/keyboard.py b/takway/board/keyboard.py new file mode 100644 index 0000000..443f9a2 --- /dev/null +++ b/takway/board/keyboard.py @@ -0,0 +1,32 @@ +import keyboard +import time + +from takway.board.base_hd import BaseHardware + +import datetime +t=0 +last_status = False + +class Keyboard(BaseHardware): + def __init__(self, hd_trigger='keyboard', keyboard_key='space', hd_detect_threshold=50): + super().__init__(hd_trigger, hd_detect_threshold) + + self.keyboard_key = keyboard_key + self.init_hd_thread() + + def hd_detection_loop(self): + keyboard_status = False + while True: + ''' + keyboard_status = keyboard.is_pressed(self.keyboard_key) + with self.hd_lock: + self.shared_hd_status = keyboard_status + ''' + self.shared_hd_status = keyboard.is_pressed(self.keyboard_key) + time.sleep(0.001) + + global t, last_status + if t%2 == 0 and not self.shared_hd_status and last_status: + print(f"pres time: {datetime.datetime.now()}") + last_status = self.shared_hd_status + t+=1 \ No newline at end of file diff --git a/takway/board/orangepi.py b/takway/board/orangepi.py new file mode 100644 index 0000000..b7350a2 --- /dev/null +++ b/takway/board/orangepi.py @@ -0,0 +1,90 @@ +from takway.board.base_hd import BaseHardware +import subprocess +import datetime +import threading + +t=0 +last_status = False + +class OrangePi(BaseHardware): + def __init__(self, hd_trigger='button', hd_detect_threshold=50): + super().__init__(hd_trigger, hd_detect_threshold) + + self.BUTTON_PIN_red = 6 + self.LED_PIN_red = 2 + + self.BUTTON_PIN_blue = 8 + self.LED_PIN_blue = 5 + + self.shared_hd_status_2 = False + self.led_set_status_2 = False + + self.button_init() + self.init_hd_thread() + + def button_init(self): + subprocess.run(["gpio", "mode", str(self.LED_PIN_red), "out"]) + subprocess.run(["gpio", "mode", str(self.BUTTON_PIN_red), "in"]) + + subprocess.run(["gpio", "mode", str(self.LED_PIN_blue), "out"]) + subprocess.run(["gpio", "mode", str(self.BUTTON_PIN_blue), "in"]) + + + def init_hd_thread(self): + hd_threads = [threading.Thread(target=self.hd_detection_loop), + threading.Thread(target=self.hd_detection_loop_2)] + for hd_thread in hd_threads: + hd_thread.start() + + @property + def button_status(self): + return self.shared_hd_status + + def hd_detection_loop(self): + keyboard_status = False + while True: + self.shared_hd_status = True if subprocess.run(["gpio", "read", str(self.BUTTON_PIN_red)], capture_output=True, text=True).stdout.strip() == '0' else False + if self.shared_hd_status: + # 打开LED(输出高电平) + subprocess.run(["gpio", "write", str(self.LED_PIN_red), "1"]) + else: + # 关闭LED(输出低电平) + subprocess.run(["gpio", "write", str(self.LED_PIN_red), "0"]) + + global t, last_status + if not self.shared_hd_status and last_status: + print(f"pres time: {datetime.datetime.now()}") + last_status = self.shared_hd_status + t+=1 + + @property + def button2_status(self): + return self.shared_hd_status_2 + + def hd_detection_loop_2(self): + keyboard_status = False + while True: + if self.led_set_status_2: + self.set_led2_on() + continue + self.shared_hd_status_2 = True if subprocess.run(["gpio", "read", str(self.BUTTON_PIN_blue)], capture_output=True, text=True).stdout.strip() == '0' else False + if self.shared_hd_status_2: + # 打开LED(输出高电平) + subprocess.run(["gpio", "write", str(self.LED_PIN_blue), "1"]) + else: + # 关闭LED(输出低电平) + subprocess.run(["gpio", "write", str(self.LED_PIN_blue), "0"]) + + def set_led1_on(self): + subprocess.run(["gpio", "write", str(self.LED_PIN_red), "1"]) + + def set_led1_off(self): + subprocess.run(["gpio", "write", str(self.LED_PIN_red), "0"]) + + def set_led2_on(self): + self.led_set_status_2 = True + subprocess.run(["gpio", "write", str(self.LED_PIN_blue), "1"]) + + def set_led2_off(self): + self.led_set_status_2 = False + subprocess.run(["gpio", "write", str(self.LED_PIN_blue), "0"]) \ No newline at end of file diff --git a/takway/board/sipeed.py b/takway/board/sipeed.py new file mode 100644 index 0000000..9ab8da6 --- /dev/null +++ b/takway/board/sipeed.py @@ -0,0 +1,58 @@ +import sys +import warnings +import threading +import time +from collections import deque + +from takway.board.base_hd import BaseHardware + +if "gpiod" in sys.modules: + # sipeed MaixSense V329 + import gpiod as gpio +else: + # 如果所有库都不存在,执行默认操作或抛出异常 + # raise ImportError("gpiod package is not available.") + warnings.warn("gpiod package is not available.") + +class V329(BaseHardware): + def __init__(self, hd_trigger='button', hd_detect_threshold=50): + super().__init__(hd_trigger, hd_detect_threshold) + self.button = self.button_init() + + self.init_hd_thread() + + def button_init(self): + PH_BASE = (8-1)*32 #PH + + gpiochip1 = gpio.chip("gpiochip1") + button = gpiochip1.get_line((PH_BASE+5)) + config = gpio.line_request() + config.request_type = gpio.line_request.DIRECTION_INPUT + config.flags = gpio.line_request.FLAG_BIAS_PULL_UP + button.request(config) + return button + + @property + def button_status(self): + return True if self.button.get_value() == 1 else False + + def hd_detection_loop(self): + self.shared_hd_status = False + button_value_list = deque(maxlen=self.hd_detect_threshold) + + while True: + if len(button_value_list) > button_value_list.maxlen: + button_value_list.popleft() + button_value_list.append(self.button_status) + # 记录50个值,如果连续50个值都是True,则认为按钮被按下 + if button_value_list.count(True) == button_value_list.maxlen: + with self.hd_lock: + self.shared_hd_status = True + # 记录50个值,如果连续50个值都是False,则认为按钮被松开 + if button_value_list.count(False) == button_value_list.maxlen: + with self.hd_lock: + self.shared_hd_status = False + + + + diff --git a/takway/common_utils.py b/takway/common_utils.py new file mode 100644 index 0000000..3f2795d --- /dev/null +++ b/takway/common_utils.py @@ -0,0 +1,96 @@ + +# ############################################################# # +# format table function +# ############################################################# # + +def format_table(header, rows): + # 计算列宽 + col_width = max(len(str(word)) for row in rows for word in row) + 2 # 最大单词长度 + 2 作为列宽 + # 打印表头 + print("".join(word.ljust(col_width) for word in header)) + # 打印分隔线 + print("".join("-" * col_width for _ in header)) + # 打印内容 + for row in rows: + print("".join(str(word).ljust(col_width) for word in row)) + +# ############################################################# # +# encode and decode bytes and string +# ############################################################# # + +import base64 +def encode_bytes2str(data): + # 将字节串编码为Base64 + if data is None: + return None + return base64.b64encode(data).decode('utf-8') + +def decode_str2bytes(data): + # 将Base64编码的字节串解码为字节串 + if data is None: + return None + return base64.b64decode(data.encode('utf-8')) + +import re +def split_sentences(text: str): + # 定义中文标点符号的正则表达式 + pattern = r'[\。\,\、\;\:\?\!\“\”\(\)\《\》]+' + # 使用正则表达式分割字符串 + sentences = re.split(pattern, text) + # 过滤掉空字符串 + sentences = [sentence for sentence in sentences if sentence] + return sentences +''' +# 示例文本 +text = "今天天气真好,我们去公园玩吧!你觉得怎么样?好的,那就这么定了。" +# 调用函数进行断句 +sentences = split_sentences(text) + +print(sentences) +''' + +def split_chinese_text(text: str, return_patch=False): + # 定义中文标点符号集合 + punctuations = set('。!?,;:、“”()《》【】') + # 初始化断句结果列表和标点符号列表 + sentences = [] + punctuation_list = [] + + text_patch = [] + + start = 0 # 断句开始位置 + for i, char in enumerate(text): + if char in punctuations: + # 如果当前字符是标点符号,则进行断句,并记录标点符号 + sentences.append(text[start:i+1]) + punctuation_list.append(char) + start = i + 1 # 更新断句开始位置 + + # 处理最后一句(如果最后一句后没有标点符号) + if start < len(text): + sentences.append(text[start:]) + + + if return_patch: + if len(punctuation_list) == 0: + return [text], False # 有残留语句 + elif len(sentences) == len(punctuation_list): + return [''.join(sentences)], True + else: + return [''.join(sentences[:-1]), sentences[-1]], True + return sentences, punctuation_list +''' +# 示例文本 +text = "你好,世界!今天天气怎么样?希望你有一个美好的一天。" +sentences, punctuation_list = split_chinese_text(text) + +print("断句结果:", sentences) +print("标点符号列表:", punctuation_list) +''' + +def remove_brackets_and_contents(text): + # 使用sub函数替换匹配的文本为空字符串 + result = re.sub(r'\(.*?\)', '', text) + result = re.sub(r'\(.*?\)', '', result) + result = re.sub(r'\【.*?\】', '', result) + return result diff --git a/takway/savePath/example_recording.npy b/takway/savePath/example_recording.npy new file mode 100644 index 0000000..5bb6c85 Binary files /dev/null and b/takway/savePath/example_recording.npy differ diff --git a/takway/stt/modified_funasr.py b/takway/stt/modified_funasr.py index 5628aad..abdb0ca 100644 --- a/takway/stt/modified_funasr.py +++ b/takway/stt/modified_funasr.py @@ -10,6 +10,8 @@ class ModifiedRecognizer(FunAutoSpeechRecognizer): use_punct=True, use_emotion=False, use_speaker_ver=True): + + # 创建基础的 funasr模型,用于语音识别,识别出不带标点的句子 super().__init__( model_path="paraformer-zh-streaming", device="cuda", @@ -19,18 +21,28 @@ class ModifiedRecognizer(FunAutoSpeechRecognizer): chunk_ms=480, encoder_chunk_look_back=4, decoder_chunk_look_back=1) + + # 记录是否具备附加功能 self.use_punct = use_punct self.use_emotion = use_emotion self.use_speaker_ver = use_speaker_ver + # 增加标点模型 if use_punct: self.puctuation_model = Punctuation(**CTTRANSFORMER) + + # 情绪识别模型 if use_emotion: self.emotion_model = Emotion(**FUNASRFINETUNE) + + # 说话人识别模型 if use_speaker_ver: self.speaker_ver_model = speaker_verfication(**ERES2NETV2) def initialize_speaker(self, speaker_1_wav): + """ + 用于说话人识别,将输入的音频(speaker_1_wav)设立为目标说话人,并将其特征保存本地 + """ if not self.use_speaker_ver: raise NotImplementedError("no access") if speaker_1_wav.endswith(".npy"): @@ -45,35 +57,52 @@ class ModifiedRecognizer(FunAutoSpeechRecognizer): def speaker_ver(self, speaker_2_wav): + """ + 用于说话人识别,判断输入音频是否为目标说话人, + 是返回True,不是返回False + """ if not self.use_speaker_ver: raise NotImplementedError("no access") if not hasattr(self, "save_speaker_path"): raise NotImplementedError("please initialize speaker first") # pdb.set_trace() + # self.speaker_ver_model.verfication 返回值为字符串 'yes' / 'no' return self.speaker_ver_model.verfication(base_emb=self.save_speaker_path, speaker_2_wav=speaker_2_wav) == 'yes' def recognize(self, audio_data): + """ + 非流式语音识别,返回识别出的文本,返回值类型 str + """ audio_data = self.check_audio_type(audio_data) + # 说话人识别 if self.use_speaker_ver: if self.speaker_ver_model.verfication(self.save_speaker_path, speaker_2_wav=audio_data) == 'no': return "Other People" + # 语音识别 result = self.asr_model.generate(input=audio_data, batch_size_s=300, hotword=self.hotwords) text = '' for res in result: text += res['text'] + + # 添加标点 if self.use_punct: text = self.puctuation_model.process(text+'#', append_period=False).replace('#', '') return text def recognize_emotion(self, audio_data): + """ + 情感识别,返回值为: + 1. 如果说话人非目标说话人,返回字符串 "Other People" + 2. 如果说话人为目标说话人,返回字典{"Labels": List[str], "scores": List[int]} + """ audio_data = self.check_audio_type(audio_data) if self.use_speaker_ver: @@ -93,14 +122,20 @@ class ModifiedRecognizer(FunAutoSpeechRecognizer): audio_data: bytes or numpy array, partial audio data is_end: bool, whether the audio data is the end of a sentence auto_det_end: bool, whether to automatically detect the end of a audio data + + 流式语音识别,返回值为: + 1. 如果说话人非目标说话人,返回字符串 "Other People" + 2. 如果说话人为目标说话人,返回字典{"test": List[str], "is_end": boolean} """ audio_data = self.check_audio_type(audio_data) + # 说话人识别 if self.use_speaker_ver: if self.speaker_ver_model.verfication(self.save_speaker_path, speaker_2_wav=audio_data) == 'no': return "Other People" + # 语音识别 text_dict = dict(text=[], is_end=is_end) if self.audio_cache is None: @@ -145,6 +180,8 @@ class ModifiedRecognizer(FunAutoSpeechRecognizer): except ValueError as e: print(f"ValueError: {e}") continue + + # 增添标点 if self.use_punct: text_dict['text'].append(self.puctuation_model.process(self.text_postprecess(res[0], data_id='text'), cache=text_dict)) else: diff --git a/takway/stt/speaker_ver_utils.py b/takway/stt/speaker_ver_utils.py index 838393f..9d8f560 100644 --- a/takway/stt/speaker_ver_utils.py +++ b/takway/stt/speaker_ver_utils.py @@ -10,7 +10,7 @@ ERES2NETV2 = { } # 保存 embedding 的路径 -DEFALUT_SAVE_PATH = r"D:\python\irving\takway_base-main\examples" +DEFALUT_SAVE_PATH = r".\takway\savePath" class speaker_verfication: def __init__(self, diff --git a/takway/stt/vosk_utils.py b/takway/stt/vosk_utils.py deleted file mode 100644 index b67cfa5..0000000 --- a/takway/stt/vosk_utils.py +++ /dev/null @@ -1,120 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -# ####################################################### # -# VOSKAutoSpeechRecognizer -# ####################################################### # -import json -import wave -import io -import os -from vosk import Model, KaldiRecognizer, SetLogLevel -from .base_stt import STTBase -from ..common_utils import decode_str2bytes - -class VOSKAutoSpeechRecognizer(STTBase): - def __init__(self, model_path="vosk-model-small-cn-0.22", RATE=16000, cfg_path=None, efficent_mode=True, debug=False): - super().__init__(self, model_path=model_path, RATE=RATE, cfg_path=cfg_path, debug=debug) - self.asr_model = AutoModel(model="paraformer-zh-streaming") - - self.apply_asr_config(self.asr_cfg) - - def recognize_keywords(self, audio_data, partial_size=None, queue=None): - """recognize keywords in audio data""" - audio_data = self.check_audio_type(audio_data) - if partial_size is None: - rec_result = self.recognize(audio_data, queue) - rec_text = self.result_postprecess(rec_result) - else: - rec_result = self.partial_recognize(audio_data, partial_size, queue) - rec_text = self.result_postprecess(rec_result, 'partial') - print(f"rec_text: {rec_text}") - if rec_text != '': - print(f"rec_text: {rec_text}") - if any(keyword in rec_text for keyword in self.keywords): - print("Keyword detected.") - return True, rec_text - else: - return False, None - - def recognize(self, audio_data, queue=None): - """recognize audio data to text""" - audio_data = self.check_audio_type(audio_data) - self.asr.AcceptWaveform(audio_data) - result = json.loads(self.asr.FinalResult()) - # TODO: put result to queue - return result - - def partial_recognize(self, audio_data, partial_size=1024, queue=None): - """recognize partial result""" - audio_data = self.check_audio_type(audio_data) - text_dict = dict( - text=[], - partial=[], - final=[], - is_end=False) - # 逐个分割音频数据进行识别 - for i in range(0, len(audio_data), partial_size): - # print(f"partial data: {i} - {i+partial_size}") - data = audio_data[i:i+partial_size] - if len(data) == 0: - break - if self.asr.AcceptWaveform(data): - result = json.loads(self.asr.Result()) - if result['text'] != '': - text_dict['text'].append(result['text']) - if queue is not None: - queue.put(('stt_info', text_dict)) - # print(f"text result: {result}") - else: - result = json.loads(self.asr.PartialResult()) - if result['partial'] != '': - # text_dict['partial'].append(result['partial']) - text_dict['partial'] = [result['partial']] - if queue is not None: - queue.put(('stt_info', text_dict)) - # print(f"partial result: {result}") - - # final recognize - final_result = json.loads(self.asr.FinalResult()) - if final_result['text'] != '': - text_dict['final'].append(final_result['text']) - text_dict['text'].append(final_result['text']) - - text_dict['is_end'] = True - - print(f"final dict: {text_dict}") - if queue is not None: - queue.put(('stt_info', text_dict)) - return text_dict - - -if __name__ == "__main__": - ''' - wav_file_path = "recording.wav" - - # You can set log level to -1 to disable debug messages - SetLogLevel(0) - - model = Model(model_path="vosk-model-small-cn-0.22") - - # 调用函数进行录音 - # record_audio(wav_file_path) - data = record_audio() - - # 调用函数进行音频转写 - result = audio_to_text(data, model) - - print("-------------") - print(result) - ''' - from takway.audio_utils import Recorder - rec = Recorder() - - return_type = 'bytes' - data = rec.record(return_type) - print(type(data)) - - asr = AutoSpeechRecognizer() - # asr.recognize(data) - asr.add_keyword("你好") - asr.recognize_keywords(data) \ No newline at end of file