From 21f9c86c462259a62a1e07343e6c80b29ae82674 Mon Sep 17 00:00:00 2001 From: benjamin <1286884552@qq.com> Date: Mon, 24 Jun 2024 11:56:00 +0800 Subject: [PATCH] feat: mqtt audio publish&subscribe --- mqtt/publish_audio.py | 23 ++++++++++++++++ mqtt/publish_audio_multithread.py | 33 +++++++++++++++++++++++ mqtt/subscribe_audio.py | 33 +++++++++++++++++++++++ mqtt/subscribe_audio_multithread.py | 42 +++++++++++++++++++++++++++++ 4 files changed, 131 insertions(+) create mode 100644 mqtt/publish_audio.py create mode 100644 mqtt/publish_audio_multithread.py create mode 100644 mqtt/subscribe_audio.py create mode 100644 mqtt/subscribe_audio_multithread.py diff --git a/mqtt/publish_audio.py b/mqtt/publish_audio.py new file mode 100644 index 0000000..6c06128 --- /dev/null +++ b/mqtt/publish_audio.py @@ -0,0 +1,23 @@ +import paho.mqtt.client as mqtt + +# MQTT Broker信息 +broker = '127.0.0.1' +port = 1883 +topic = 'audio/test' + +# 音频文件路径 +audio_file_path = 'tmp2.wav' + +def publish_audio(): + client = mqtt.Client() + client.connect(broker, port) + + with open(audio_file_path, 'rb') as audio_file: + audio_data = audio_file.read() + + client.publish(topic, audio_data) + client.disconnect() + +if __name__ == "__main__": + publish_audio() + print("Audio published successfully.") diff --git a/mqtt/publish_audio_multithread.py b/mqtt/publish_audio_multithread.py new file mode 100644 index 0000000..f7c6f37 --- /dev/null +++ b/mqtt/publish_audio_multithread.py @@ -0,0 +1,33 @@ +import paho.mqtt.client as mqtt +import threading +import time + +# MQTT Broker信息 +broker = '127.0.0.1' +port = 1883 +topic = 'audio/test' + +# 音频文件路径列表 +audio_file_paths = ['tmp2.wav', 'tmp3.wav', 'tmp4.wav'] # 添加多个音频文件路径 + +def publish_audio(file_path): + client = mqtt.Client() + client.connect(broker, port) + + with open(file_path, 'rb') as audio_file: + audio_data = audio_file.read() + + client.publish(topic, audio_data) + client.disconnect() + print(f"Audio from {file_path} published successfully.") + +if __name__ == "__main__": + threads = [] + for file_path in audio_file_paths: + thread = threading.Thread(target=publish_audio, args=(file_path,)) + thread.start() + threads.append(thread) + time.sleep(1) # 可选:给每个线程一些间隔时间 + + for thread in threads: + thread.join() diff --git a/mqtt/subscribe_audio.py b/mqtt/subscribe_audio.py new file mode 100644 index 0000000..3e6c4a7 --- /dev/null +++ b/mqtt/subscribe_audio.py @@ -0,0 +1,33 @@ +import paho.mqtt.client as mqtt +from pydub import AudioSegment +import io + +# MQTT Broker信息 +broker = '127.0.0.1' +port = 1883 +topic = 'audio/test' +output_file_path = 'received_audio.wav' + +def on_connect(client, userdata, flags, rc): + print("Connected with result code " + str(rc)) + client.subscribe(topic) + +def on_message(client, userdata, msg): + print("Audio received") + audio_data = msg.payload + audio = AudioSegment.from_file(io.BytesIO(audio_data), format="wav") + # 将音频保存为文件 + with open(output_file_path, 'wb') as f: + f.write(audio_data) + print(f"Audio saved as {output_file_path}") + +def subscribe_audio(): + client = mqtt.Client() + client.on_connect = on_connect + client.on_message = on_message + + client.connect(broker, port) + client.loop_forever() + +if __name__ == "__main__": + subscribe_audio() diff --git a/mqtt/subscribe_audio_multithread.py b/mqtt/subscribe_audio_multithread.py new file mode 100644 index 0000000..710b234 --- /dev/null +++ b/mqtt/subscribe_audio_multithread.py @@ -0,0 +1,42 @@ +import paho.mqtt.client as mqtt +from pydub import AudioSegment +import io +import threading + +# MQTT Broker信息 +broker = '127.0.0.1' +port = 1883 +topic = 'audio/test' +output_file_path = 'received_audio.wav' + +def on_connect(client, userdata, flags, rc): + print("Connected with result code " + str(rc)) + client.subscribe(topic) + +def handle_audio(audio_data): + # 将音频保存为文件 + with open(output_file_path, 'wb') as f: + f.write(audio_data) + print(f"Audio saved as {output_file_path}") + +def on_message(client, userdata, msg): + print("Audio received") + audio_data = msg.payload + # 使用线程来处理音频 + audio_thread = threading.Thread(target=handle_audio, args=(audio_data,)) + audio_thread.start() + +def subscribe_audio(): + client = mqtt.Client() + client.on_connect = on_connect + client.on_message = on_message + + client.connect(broker, port) + client.loop_forever() + +def mqtt_thread(): + subscribe_audio() + +if __name__ == "__main__": + mqtt_thread = threading.Thread(target=mqtt_thread) + mqtt_thread.start()