update: 更新程序,增加了并发测试程序

This commit is contained in:
killua4396 2024-05-13 15:14:54 +08:00
parent 4974570f20
commit bd893a4599
5 changed files with 79 additions and 39 deletions

View File

@ -1,31 +1,10 @@
from tests.unit_test.user_test import UserServiceTest
from tests.unit_test.character_test import CharacterServiceTest
from tests.unit_test.chat_test import ChatServiceTest
from tests.unit_test.user_test import user_test
from tests.unit_test.character_test import character_test
from tests.unit_test.chat_test import chat_test
import asyncio
if __name__ == '__main__':
user_service_test = UserServiceTest()
character_service_test = CharacterServiceTest()
chat_service_test = ChatServiceTest()
user_service_test.test_user_create()
user_service_test.test_user_update()
user_service_test.test_user_query()
user_service_test.test_hardware_bind()
user_service_test.test_hardware_unbind()
user_service_test.test_user_delete()
character_service_test.test_character_create()
character_service_test.test_character_update()
character_service_test.test_character_query()
character_service_test.test_character_delete()
chat_service_test.test_create_chat()
chat_service_test.test_session_id_query()
chat_service_test.test_session_content_query()
chat_service_test.test_session_update()
asyncio.run(chat_service_test.test_chat_temporary())
asyncio.run(chat_service_test.test_chat_lasting())
asyncio.run(chat_service_test.test_voice_call())
chat_service_test.test_chat_delete()
user_test()
character_test()
chat_test()
print("全部测试成功")

View File

@ -66,9 +66,14 @@ class CharacterServiceTest:
else:
raise Exception("角色删除测试失败")
if __name__ == '__main__':
def character_test():
character_service_test = CharacterServiceTest()
character_service_test.test_character_create()
character_service_test.test_character_update()
character_service_test.test_character_query()
character_service_test.test_character_delete()
if __name__ == '__main__':
character_test()

View File

@ -293,7 +293,6 @@ class ChatServiceTest:
await asyncio.gather(audio_stream(websocket))
#测试删除聊天
def test_chat_delete(self):
url = f"{self.socket}/chats/{self.user_character_id}"
@ -314,16 +313,17 @@ class ChatServiceTest:
raise Exception("角色删除测试失败")
if __name__ == '__main__':
def chat_test():
chat_service_test = ChatServiceTest()
chat_service_test.test_create_chat()
chat_service_test.test_session_id_query()
chat_service_test.test_session_content_query()
chat_service_test.test_session_update()
asyncio.run(chat_service_test.test_chat_temporary())
asyncio.run(chat_service_test.test_chat_lasting())
asyncio.run(chat_service_test.test_voice_call())
# asyncio.run(chat_service_test.test_chat_lasting())
# asyncio.run(chat_service_test.test_voice_call())
chat_service_test.test_chat_delete()
if __name__ == '__main__':
chat_test()

View File

@ -0,0 +1,12 @@
from chat_test import chat_test
import multiprocessing
if __name__ == '__main__':
processes = []
for _ in range(2):
p = multiprocessing.Process(target=chat_test)
processes.append(p)
p.start()
for p in processes:
p.join()

View File

@ -66,7 +66,7 @@ class UserServiceTest:
mac = "08:00:20:0A:8C:6G"
payload = json.dumps({
"mac":mac,
"user_id":1,
"user_id":self.id,
"firmware":"v1.0",
"model":"香橙派"
})
@ -88,12 +88,56 @@ class UserServiceTest:
else:
raise Exception("硬件解绑测试失败")
if __name__ == '__main__':
def test_hardware_bind_change(self):
url = f"{self.socket}/users/hardware/{self.hd_id}/bindchange"
payload = json.dumps({
"user_id" : self.id
})
headers = {
'Content-Type': 'application/json'
}
response = requests.request("PUT", url, headers=headers, data=payload)
if response.status_code == 200:
print("硬件换绑测试成功")
else:
raise Exception("硬件换绑测试失败")
def test_hardware_update(self):
url = f"{self.socket}/users/hardware/{self.hd_id}/info"
payload = json.dumps({
"mac":"08:00:20:0A:8C:6G",
"firmware":"v1.0",
"model":"香橙派"
})
headers = {
'Content-Type': 'application/json'
}
response = requests.request("PUT", url, headers=headers, data=payload)
if response.status_code == 200:
print("硬件信息更新测试成功")
else:
raise Exception("硬件信息更新测试失败")
def test_hardware_query(self):
url = f"{self.socket}/users/hardware/{self.hd_id}"
response = requests.request("GET", url)
if response.status_code == 200:
print("硬件查询测试成功")
else:
raise Exception("硬件查询测试失败")
def user_test():
user_service_test = UserServiceTest()
user_service_test.test_user_create()
user_service_test.test_user_update()
user_service_test.test_user_query()
user_service_test.test_hardware_bind()
user_service_test.test_hardware_bind_change()
user_service_test.test_hardware_update()
user_service_test.test_hardware_query()
user_service_test.test_hardware_unbind()
user_service_test.test_user_delete()
if __name__ == '__main__':
user_test()