forked from killua/TakwayPlatform
143 lines
4.7 KiB
Python
143 lines
4.7 KiB
Python
import requests
|
|
import json
|
|
import uuid
|
|
|
|
|
|
class UserServiceTest:
|
|
def __init__(self,socket="http://127.0.0.1:8001"):
|
|
self.socket = socket
|
|
|
|
def test_user_create(self):
|
|
url = f"{self.socket}/users"
|
|
open_id = str(uuid.uuid4())
|
|
payload = json.dumps({
|
|
"open_id": open_id,
|
|
"username": "test_user",
|
|
"avatar_id": "0",
|
|
"tags" : "[]",
|
|
"persona" : "{}"
|
|
})
|
|
headers = {
|
|
'Content-Type': 'application/json'
|
|
}
|
|
response = requests.request("POST", url, headers=headers, data=payload)
|
|
if response.status_code == 200:
|
|
print("用户创建测试成功")
|
|
self.id = response.json()["data"]["user_id"]
|
|
else:
|
|
raise Exception("用户创建测试失败")
|
|
|
|
def test_user_update(self):
|
|
url = f"{self.socket}/users/"+str(self.id)
|
|
payload = json.dumps({
|
|
"open_id": str(uuid.uuid4()),
|
|
"username": "test_user",
|
|
"avatar_id": "0",
|
|
"tags": "[]",
|
|
"persona": "{}"
|
|
})
|
|
headers = {
|
|
'Content-Type': 'application/json'
|
|
}
|
|
response = requests.request("PUT", url, headers=headers, data=payload)
|
|
if response.status_code == 200:
|
|
print("用户更新测试成功")
|
|
else:
|
|
raise Exception("用户更新测试失败")
|
|
|
|
def test_user_query(self):
|
|
url = f"{self.socket}/users/{self.id}"
|
|
response = requests.request("GET", url)
|
|
if response.status_code == 200:
|
|
print("用户查询测试成功")
|
|
else:
|
|
raise Exception("用户查询测试失败")
|
|
|
|
def test_user_delete(self):
|
|
url = f"{self.socket}/users/{self.id}"
|
|
response = requests.request("DELETE", url)
|
|
if response.status_code == 200:
|
|
print("用户删除测试成功")
|
|
else:
|
|
raise Exception("用户删除测试失败")
|
|
|
|
def test_hardware_bind(self):
|
|
url = f"{self.socket}/users/hardware"
|
|
mac = "08:00:20:0A:8C:6G"
|
|
payload = json.dumps({
|
|
"mac":mac,
|
|
"user_id":self.id,
|
|
"firmware":"v1.0",
|
|
"model":"香橙派"
|
|
})
|
|
headers = {
|
|
'Content-Type': 'application/json'
|
|
}
|
|
response = requests.request("POST", url, headers=headers, data=payload)
|
|
if response.status_code == 200:
|
|
print("硬件绑定测试成功")
|
|
self.hd_id = response.json()["data"]['hardware_id']
|
|
else:
|
|
raise Exception("硬件绑定测试失败")
|
|
|
|
def test_hardware_unbind(self):
|
|
url = f"{self.socket}/users/hardware/{self.hd_id}"
|
|
response = requests.request("DELETE", url)
|
|
if response.status_code == 200:
|
|
print("硬件解绑测试成功")
|
|
else:
|
|
raise Exception("硬件解绑测试失败")
|
|
|
|
def test_hardware_bind_change(self):
|
|
url = f"{self.socket}/users/hardware/{self.hd_id}/bindchange"
|
|
payload = json.dumps({
|
|
"user_id" : self.id
|
|
})
|
|
headers = {
|
|
'Content-Type': 'application/json'
|
|
}
|
|
response = requests.request("PUT", url, headers=headers, data=payload)
|
|
if response.status_code == 200:
|
|
print("硬件换绑测试成功")
|
|
else:
|
|
raise Exception("硬件换绑测试失败")
|
|
|
|
def test_hardware_update(self):
|
|
url = f"{self.socket}/users/hardware/{self.hd_id}/info"
|
|
payload = json.dumps({
|
|
"mac":"08:00:20:0A:8C:6G",
|
|
"firmware":"v1.0",
|
|
"model":"香橙派"
|
|
})
|
|
headers = {
|
|
'Content-Type': 'application/json'
|
|
}
|
|
response = requests.request("PUT", url, headers=headers, data=payload)
|
|
if response.status_code == 200:
|
|
print("硬件信息更新测试成功")
|
|
else:
|
|
raise Exception("硬件信息更新测试失败")
|
|
|
|
def test_hardware_query(self):
|
|
url = f"{self.socket}/users/hardware/{self.hd_id}"
|
|
response = requests.request("GET", url)
|
|
if response.status_code == 200:
|
|
print("硬件查询测试成功")
|
|
else:
|
|
raise Exception("硬件查询测试失败")
|
|
|
|
def user_test():
|
|
user_service_test = UserServiceTest()
|
|
user_service_test.test_user_create()
|
|
user_service_test.test_user_update()
|
|
user_service_test.test_user_query()
|
|
user_service_test.test_hardware_bind()
|
|
user_service_test.test_hardware_bind_change()
|
|
user_service_test.test_hardware_update()
|
|
user_service_test.test_hardware_query()
|
|
user_service_test.test_hardware_unbind()
|
|
user_service_test.test_user_delete()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
user_test() |