yamlapi接口测试框架
2021-01-13 07:31
标签:end itertools 公共属性 执行命令 规则 for循环 odi int www 1、思路: yamlapi支持unittest与pytest两种运行模式, yamlapi即为yaml文件+api测试的缩写, 可以看作是一个脚手架工具, 可以快速生成项目的各个目录与文件, 测试人员只需维护一份或者多份yaml文件即可, 不需要大量写代码。 2、安装: https://pypi.org/ 可在首页搜索“yamlapi”, 或者直接访问项目主页: https://pypi.org/project/yamlapi/ pip install yamlapi # 安装 yamlapi -h(或yamlapi --help) # 查看参数信息 yamlapi -v(或yamlapi --v) # 查看版本号 pip install -U yamlapi # 安装最新版 yamlapi --p=项目名称 # 创建项目 # 例如在某个路径下执行命令:yamlapi --p=demo_project pip uninstall yamlapi # 卸载 3、工程示例: README.md文件: demo_test.py文件: project_config.py文件: 4、运行: unittest模式: python+测试文件名+环境缩写 python ./case/demo_test.py dev python ./case/demo_test.py test python ./case/demo_test.py pre python ./case/demo_test.py formal pytest模式: pytest -v 完整命令为: pytest -v --reruns 3 --reruns-delay 3 --timeout=60 --junitxml=./report/report.xml --html=./report/report.html --self-contained-html --alluredir=./report/allure-report 已写进pytest.ini配置文件 yamlapi接口测试框架 标签:end itertools 公共属性 执行命令 规则 for循环 odi int www 原文地址:https://www.cnblogs.com/yjlch1016/p/12286091.html# honeybee
(蜜蜂)接口测试框架
# 一、思路
1、采用requests+unittest+ddt+PyMySQL+BeautifulReport+demjson+loguru+PyYAML+pytest+pytest-html+allure-pytest+pytest-rerunfailures+pytest-sugar+pytest-timeout
2、requests是发起HTTP请求的第三方库
3、unittest是Python自带的单元测试工具
4、ddt是数据驱动的第三方库
5、PyMySQL是连接MySQL的第三方库
6、BeautifulReport是生成html测试报告的第三方库
7、demjson是解析json的第三方库
8、loguru是记录日志的第三方库
9、PyYAML是读写yaml文件的第三方库
10、pytest是单元测试的第三方库
11、pytest-html是生成html测试报告的插件
12、allure-pytest是生成allure测试报告的插件
13、pytest-rerunfailures是失败重跑的插件
14、pytest-sugar是显示进度的插件
15、pytest-timeout是设置超时时间的插件
# 二、目录结构
1、case是测试用例包
2、log是日志目录
3、report是测试报告的目录
4、resource是yaml文件的目录
5、setting是工程的配置文件包
6、tool是常用方法的封装包
# 三、yaml文件说明
1、字段(命名和格式不可修改,顺序可以修改)
case_name: 用例名称
mysql: MySQL查询语句
request_mode: 请求方式
api: 接口
data: 请求体,缩进字典格式或者json格式
headers: 请求头,缩进字典格式或者json格式
query_string: 请求参数,缩进字典格式或者json格式
expected_code: 预期的响应代码
expected_result: 预期的响应结果,-列表格式
regular: 正则,缩进字典格式
>>variable:变量名,-列表格式
>>expression:表达式,-列表格式
2、参数化
正则表达式提取的结果用${变量名}表示,一条用例里面可以有多个
MySQL返回的结果用{__SQL}表示,一条用例里面可以有多个
随机数字用{__RN位数},一条用例里面可以有多个
随机英文字母用{__RL位数},一条用例里面可以有多个
以上4种类型在一条用例里面可以混合使用
${变量名}的作用域是全局的,其它3种的作用域仅限该条用例
1 import json
2 import re
3 import os
4 import sys
5 import unittest
6 from itertools import chain
7 from time import sleep
8
9 import ddt
10 import demjson
11 import requests
12
13 BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
14 sys.path.append(BASE_DIR)
15
16 from setting.project_config import *
17 from tool.connect_mysql import query_mysql
18 from tool.create_random import create_random_number, create_random_letters
19 from tool.read_write_yaml import read_yaml, write_yaml
20 from tool.beautiful_report_run import beautiful_report_run
21
22
23 @ddt.ddt
24 # 声明使用ddt
25 class DemoTest(unittest.TestCase):
26 temporary_yaml = yaml_path + "/temporary.yaml"
27 if os.path.isfile(temporary_yaml):
28 # 如果临时yaml文件存在
29 os.remove(temporary_yaml)
30 # 删除之
31 demo_one_list = read_yaml("/demo_one.yaml")
32 demo_two_list = read_yaml("/demo_two.yaml")
33 demo_three_list = read_yaml("/demo_three.yaml")
34 temporary_list = demo_one_list + demo_two_list + demo_three_list
35 temporary_yaml = yaml_path + write_yaml("/temporary.yaml", temporary_list)
36
37 # 把几个yaml文件合并为一个临时yaml文件
38
39 @classmethod
40 def setUpClass(cls) -> None:
41 cls.variable_result_dict = {}
42 # 定义一个变量名与提取的结果字典
43 # cls.variable_result_dict与self.variable_result_dict都是本类的公共属性
44
45 @ddt.file_data(yaml_path + "/temporary.yaml")
46 # 传入临时yaml文件
47 def test_logistics(self, **kwargs):
48 """
49 测试用例
50 :param kwargs:
51 :return:
52 """
53
54 kwargs = str(kwargs)
55 if "None" in kwargs:
56 kwargs = kwargs.replace("None", "‘‘")
57 kwargs = demjson.decode(kwargs)
58 # 把值为None的替换成‘‘空字符串,因为None无法拼接
59 # demjson.decode()等价于json.loads()反序列化
60
61 case_name = kwargs.get("case_name")
62 # 用例名称
63 self._testMethodDoc = case_name
64 # 测试报告里面的用例描述
65 mysql = kwargs.get("mysql")
66 # mysql查询语句
67 request_mode = kwargs.get("request_mode")
68 # 请求方式
69 api = kwargs.get("api")
70 # 接口
71 if type(api) != str:
72 api = str(api)
73 payload = kwargs.get("data")
74 # 请求体
75 if type(payload) != str:
76 payload = str(payload)
77 headers = kwargs.get("headers")
78 # 请求头
79 if type(headers) != str:
80 headers = str(headers)
81 query_string = kwargs.get("query_string")
82 # 请求参数
83 if type(query_string) != str:
84 query_string = str(query_string)
85 expected_code = kwargs.get("expected_code")
86 # 预期的响应代码
87 expected_result = kwargs.get("expected_result")
88 # 预期的响应结果
89 regular = kwargs.get("regular")
90 # 正则
91
92 logger.info("{}>>>开始执行", case_name)
93
94 if environment == "prd" and mysql != "":
95 self.skipTest("生产环境跳过此用例,请忽略")
96 # 生产环境不能连接MySQL数据库,因此跳过,此行后面的都不会执行
97
98 requests_list = [api, payload, headers, query_string]
99 # 请求数据列表
100
101 for index, value in enumerate(requests_list):
102 # for循环修改requests_list的值
103
104 if self.variable_result_dict:
105 # 如果变量名与提取的结果字典不为空
106 if "$" in value:
107 for key, value_2 in self.variable_result_dict.items():
108 value = value.replace("{" + key + "}", value_2)
109 # replace(old, new)把字符串中的旧字符串替换成正则表达式提取的值
110 value = re.sub("\\$", "", value)
111 # re.sub(old, new, 源字符串)默认全部替换
112 # 如果遇到带有转义的字符被当作特殊字符时,使用双反斜杠\\来转义,或者在引号前面加r
113 else:
114 pass
115
116 if mysql:
117 # 如果mysql查询语句不为空
118 if "$" in mysql:
119 # 有些场景下MySQL查询语句也需要参数化
120 for key, value_2 in self.variable_result_dict.items():
121 mysql = mysql.replace("{" + key + "}", value_2)
122 mysql = re.sub("\\$", "", mysql)
123 mysql_result_tuple = query_mysql(mysql)
124 # mysql查询结果元祖
125 mysql_result_list = list(chain.from_iterable(mysql_result_tuple))
126 # 把二维元祖转换为一维列表
127 if "__SQL" in value:
128 for i in mysql_result_list:
129 if type(i) != str:
130 i = str(i)
131 value = value.replace("{__SQL}", i, 1)
132 # replace(old, new, 替换次数)把字符串中的{__SQL}替换成mysql查询返回的值
133 else:
134 pass
135
136 if "__RN" in value:
137 digit_list = re.findall("{__RN(.+?)}", value)
138 # 获取位数列表
139 for j in digit_list:
140 random_number = create_random_number(int(j))
141 # 调用生成随机数字的方法
142 value = value.replace("{__RN" + j + "}", random_number)
143
144 if "__RL" in value:
145 digit_list = re.findall("{__RL(.+?)}", value)
146 # 获取位数列表
147 for i in digit_list:
148 random_letters = create_random_letters(int(i))
149 # 调用生成随机字母的方法
150 value = value.replace("{__RL" + i + "}", random_letters)
151
152 requests_list[index] = value
153
154 api = requests_list[0]
155 payload = requests_list[1]
156 headers = requests_list[2]
157 query_string = requests_list[3]
158
159 if payload != "":
160 payload = demjson.decode(payload)
161 if headers != "":
162 headers = demjson.decode(headers)
163 if query_string != "":
164 query_string = demjson.decode(query_string)
165
166 url = service_domain + api
167 # 拼接完整地址
168
169 logger.info("请求方式为:{}", request_mode)
170 logger.info("地址为:{}", url)
171 logger.info("请求体为:{}", payload)
172 logger.info("请求头为:{}", headers)
173 logger.info("请求参数为:{}", query_string)
174
175 logger.info("预期的响应代码为:{}", expected_code)
176 logger.info("预期的响应结果为:{}", expected_result)
177
178 response = requests.request(
179 request_mode, url, data=json.dumps(payload),
180 headers=headers, params=query_string, timeout=(9, 15))
181 # 发起HTTP请求
182 # json.dumps()序列化把字典转换成字符串,json.loads()反序列化把字符串转换成字典
183 # data请求体为字符串,headers请求头与params请求参数为字典
184
185 actual_time = response.elapsed.total_seconds()
186 # 实际的响应时间
187 actual_code = response.status_code
188 # 实际的响应代码
189 actual_result_text = response.text
190 # 实际的响应结果(文本格式)
191
192 logger.info("实际的响应代码为:{}", actual_code)
193 logger.info("实际的响应结果为:{}", actual_result_text)
194 logger.info("实际的响应时间为:{}", actual_time)
195
196 if regular:
197 # 如果正则不为空
198 extract_list = []
199 # 定义一个提取结果列表
200 for i in regular["expression"]:
201 regular_result = re.findall(i, actual_result_text)[0]
202 # re.findall(正则表达式, 实际的响应结果)返回一个符合规则的list,取第1个
203 extract_list.append(regular_result)
204 # 把提取结果添加到提取结果列表里面
205
206 temporary_dict = dict(zip(regular["variable"], extract_list))
207 # 把变量列表与提取结果列表转为一个临时字典
208
209 for key, value in temporary_dict.items():
210 self.variable_result_dict[key] = value
211 # 把临时字典合并到变量名与提取的结果字典,已去重
212 else:
213 pass
214
215 for key in list(self.variable_result_dict.keys()):
216 if not self.variable_result_dict[key]:
217 del self.variable_result_dict[key]
218 # 删除变量名与提取的结果字典中为空的键值对
219
220 actual_result_text = re.sub("{|}|\"|\\[|\\]", "", actual_result_text)
221 # 去除{、}、"、[与]
222 actual_result_list = re.split(":|,", actual_result_text)
223 # 把响应文本转为列表,并去除:与,
224
225 if expected_code == actual_code:
226 if set(expected_result) set(actual_result_list):
227 # 预期的响应结果与实际的响应结果是被包含关系
228 # 判断是否是其真子集
229 logger.info("{}>>>执行通过", case_name)
230 else:
231 logger.error("{}>>>执行失败", case_name)
232 self.assertTrue(set(expected_result) set(actual_result_list))
233 # 布尔表达式断言
234 else:
235 logger.error("{}>>>请求失败,请检查域名、路径与请求参数是否正确!", url)
236 logger.error("{}>>>执行失败", case_name)
237 self.assertTrue(set(expected_result) set(actual_result_list))
238
239 logger.info("##########用例分隔符##########\n")
240 # sleep(3)
241 # 等待时间为3秒,也可以调整为其他值
242
243
244 if __name__ == ‘__main__‘:
245 beautiful_report_run(DemoTest)
246 # 调用BeautifulReport运行方式
1 """
2 整个工程的配置文件
3 """
4
5 import os
6 import sys
7 import time
8
9 from loguru import logger
10
11 parameter = sys.argv[1]
12 # 从命令行获取参数
13
14 environment = os.getenv("measured_environment", parameter)
15 # 环境变量
16
17 if environment == "dev":
18 service_domain = "http://www.dev.com"
19 # 开发环境
20 db_host = ‘mysql.dev.com‘
21 db_port = 3306
22 elif environment == "test":
23 service_domain = "http://www.test.com"
24 # 测试环境
25 db_host = ‘mysql.test.com‘
26 db_port = 3307
27 elif environment == "pre":
28 service_domain = "http://www.pre.com"
29 # 预生产环境
30 db_host = ‘mysql.pre.com‘
31 db_port = 3308
32 elif environment == "formal":
33 service_domain = "https://www.formal.com"
34 # 生产环境
35 db_host = None
36 db_port = None
37
38 db_user = ‘root‘
39 db_password = ‘123456‘
40 db_database = ‘‘
41 # MySQL数据库配置
42
43
44 current_path = os.path.dirname(os.path.dirname(__file__))
45 # 获取当前目录的父目录的绝对路径
46 # 也就是整个工程的根目录
47 case_path = os.path.join(current_path, "case")
48 # 测试用例的目录
49 yaml_path = os.path.join(current_path, "resource")
50 # yaml文件的目录
51 today = time.strftime("%Y-%m-%d", time.localtime())
52 # 年月日
53
54 report_path = os.path.join(current_path, "report")
55 # 测试报告的目录
56 if os.path.exists(report_path):
57 pass
58 else:
59 os.mkdir(report_path, mode=0o777)
60
61 log_path = os.path.join(current_path, "log")
62 # 日志的目录
63 if os.path.exists(log_path):
64 pass
65 else:
66 os.mkdir(log_path, mode=0o777)
67
68 logging_file = os.path.join(log_path, "log{}.log".format(today))
69
70 logger.add(
71 logging_file,
72 format="{time:YYYY-MM-DD HH:mm:ss}|{level}|{message}",
73 level="INFO",
74 rotation="500 MB",
75 encoding="utf-8",
76 )
77 # loguru日志配置