TakwayDisplayPlatform/test/test.py

68 lines
2.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import base64
from datetime import datetime
import io
from websocket import create_connection
data = {
"text": "",
"audio": "",
"meta_info": {
"session_id":"a36c9bb4-e813-4f0e-9c75-18e049c60f48",
"stream": True,
"voice_synthesize": True,
"is_end": False,
"encoding": "raw"
}
}
def read_pcm_file_in_chunks(chunk_size):
with open('example_recording.wav', 'rb') as pcm_file:
while True:
data = pcm_file.read(chunk_size)
if not data:
break
yield data
def send_audio_chunk(websocket, chunk):
# 将PCM数据进行Base64编码
encoded_data = base64.b64encode(chunk).decode('utf-8')
# 更新data字典中的"audio"键的值为Base64编码后的音频数据
data["audio"] = encoded_data
# 将JSON数据对象转换为JSON字符串
message = json.dumps(data)
# 发送JSON字符串到WebSocket接口
websocket.send(message)
def send_json():
websocket = create_connection('ws://114.214.236.207:7878/api/chat/streaming/temporary')
chunks = read_pcm_file_in_chunks(1024) # 读取PCM文件并生成数据块
for chunk in chunks:
send_audio_chunk(websocket, chunk)
# print("发送数据块:", len(chunk))
import time; time.sleep(0.01)
# threading.Event().wait(0.01) # 等待0.01秒
# 设置data字典中的"is_end"键为True表示音频流结束
data["meta_info"]["is_end"] = True
# 发送最后一个数据块和流结束信号
send_audio_chunk(websocket, b'') # 发送空数据块表示结束
# 等待并打印接收到的数据
print("等待接收:", datetime.now())
audio_bytes = b''
while True:
data_ws = websocket.recv()
try:
message_json = json.loads(data_ws)
print(message_json) # 打印接收到的消息
if message_json["type"] == "close":
break # 如果没有接收到消息,则退出循环
except Exception as e:
audio_bytes += data_ws
print(e)
print("接收完毕:", datetime.now())
websocket.close()
# 启动事件循环
send_json()