TakwayDisplayPlatform/test/test.py

99 lines
3.3 KiB
Python
Raw Normal View History

2024-06-09 22:54:13 +08:00
import json
import time
2024-06-09 22:54:13 +08:00
import base64
from datetime import datetime
from websocket import create_connection
data = {
"text": "",
"audio": "",
"meta_info": {
"session_id":"469f4a99-12a5-45a6-bc91-353df07423b6",
2024-06-09 22:54:13 +08:00
"stream": True,
"voice_synthesize": True,
"is_end": False,
"encoding": "raw"
}
}
def read_pcm_file_in_chunks(chunk_size):
with open('example_recording.wav', 'rb') as pcm_file:
while True:
data = pcm_file.read(chunk_size)
if not data:
break
yield data
def send_audio_chunk(websocket, chunk):
# 将PCM数据进行Base64编码
encoded_data = base64.b64encode(chunk).decode('utf-8')
# 更新data字典中的"audio"键的值为Base64编码后的音频数据
data["audio"] = encoded_data
# 将JSON数据对象转换为JSON字符串
message = json.dumps(data)
# 发送JSON字符串到WebSocket接口
websocket.send(message)
def send_json():
websocket = create_connection('ws://114.214.236.207:7878/api/chat/streaming/temporary')
chunks = read_pcm_file_in_chunks(1024) # 读取PCM文件并生成数据块
for chunk in chunks:
send_audio_chunk(websocket, chunk)
# print("发送数据块:", len(chunk))
import time; time.sleep(0.01)
# threading.Event().wait(0.01) # 等待0.01秒
# 设置data字典中的"is_end"键为True表示音频流结束
data["meta_info"]["is_end"] = True
# 发送最后一个数据块和流结束信号
send_audio_chunk(websocket, b'') # 发送空数据块表示结束
# 等待并打印接收到的数据
wait_start_time = time.time()
print("开始等待返回数据:", datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'))
2024-06-09 22:54:13 +08:00
audio_bytes = b''
is_receive_first_frame = False
2024-06-09 22:54:13 +08:00
while True:
data_ws = websocket.recv()
if not is_receive_first_frame:
wait_end_time = time.time()
print("等待时间:", wait_end_time - wait_start_time)
is_receive_first_frame = True
2024-06-09 22:54:13 +08:00
try:
message_json = json.loads(data_ws)
print(message_json) # 打印接收到的消息
if message_json["type"] == "close":
break # 如果没有接收到消息,则退出循环
except Exception as e:
audio_bytes += data_ws
# print(e)
2024-06-09 22:54:13 +08:00
print("接收完毕:", datetime.now())
websocket.close()
# 模拟检测到杂音时只发一帧end
def send_one_end_frame():
websocket = create_connection('ws://114.214.236.207:7878/api/chat/streaming/temporary')
data["meta_info"]["is_end"] = True
send_audio_chunk(websocket, b'') # 发送空数据块表示结束
audio_bytes = b''
is_receive_first_frame = False
while True:
data_ws = websocket.recv()
if not is_receive_first_frame:
wait_end_time = time.time()
is_receive_first_frame = True
try:
message_json = json.loads(data_ws)
print(message_json) # 打印接收到的消息
if message_json["type"] == "close":
break # 如果没有接收到消息,则退出循环
except Exception as e:
audio_bytes += data_ws
# print(e)
websocket.close()
2024-06-09 22:54:13 +08:00
# 启动事件循环
# send_json()
send_one_end_frame()