API 自动化框架
2021-06-08 23:06
                         标签:post   ica   挂载   except   requests   全局变量   文件头   cap   docker      个人认为接口自动化测试使用python语言编写更加简单,但所有接口自动化项目代码的思维都是一样的 1.case:存放用例数据的包,将所有用例数据以配置文件形式传入 2.core:核心包   1)config.py:封装ConfigParser解析获取配置文件数据的方法     python的内置模块ConfigParser:不太了解的可以百度   2)log.py:封装log的模块   3)request.py:封装接口测试的方法   4)mysql.py:封装连接数据库的方法 3.function:功能包,封装执行用例的方法,以及生成测试报告的方法   生成测试报告的原理:使用file生成.md文件,使用docker,pull mkdocs相关镜像,将data volum挂载到搭载测试报告的宿主机上,访问对应的域名即可 4.report:存放所有生成的测试报告 5.constant.py:存放所有全局变量的模块 6.run.py:运行测试用例,以及生成测试报告的模块               API 自动化框架 标签:post   ica   挂载   except   requests   全局变量   文件头   cap   docker    原文地址:https://www.cnblogs.com/shadow-yin/p/10678680.htmlAPI 自动化框架
1.项目包结构

2.核心代码
 config.py
#!/usr/bin/python
# -*- coding: UTF-8 -*-
# 基础包:配置服务
import ConfigParser
import core.log as log
config = ConfigParser.ConfigParser()
logging = log.get_logger()
def get_config(filename):
    """
    获取文件配置
    :param filename: 配置文件名
    :return: None
    """
    global config
    try:
        config.read(filename)
        return True
    except Exception, e:
        logging.error("读取配置失败 %s" % e)
def get_data(title, key):
    """
    参数配置
    :param title: 配置文件的头信息
    :param key: 配置文件的key值
    :return: 配置文件的value
    """
    try:
        value = config.get(title, key)
        type(value)
        return value
    except Exception, e:
        logging.error("获取配置文件参数失败 %s" % e)
def get_title_list():
    """
    获取所有title
    :return: title list
    """
    try:
        title = config.sections()
        return str(title).decode("string_escape")
        # return ‘\n‘.join(title)
    except Exception, e:
        logging.error("获取title信息失败 %s", e)
  
mysql.py
#!/usr/bin/python
# -*- coding: UTF-8 -*-
# author: zhizhi
# 基础包: MySQL
import pymysql.cursors
import core.log as log
logging = log.get_logger()
conn = None
def connect(host, user, password, db, charset=‘utf8‘):
    """
    链接Mysql
    :param host: 地址
    :param user: 用户
    :param password: 密码
    :param db: 数据库名
    :param charset: 数据类型
    :return: 链接
    """
    global conn
    if conn == None:
        conn = pymysql.connect(host=host,
                               user=user,
                               password=password,
                               db=db,
                               charset=charset,
                               cursorclass=pymysql.cursors.DictCursor)
    return conn
def execute(sql):
    """
    执行SQL
    :param sql: 执行的SQL
    :return: 影响行数
    """
    global conn
    try:
        with conn.cursor() as cursor:
            res = cursor.execute(sql)
        conn.commit()
        # 这里一定要写commit 不然提交的sql 都会被事务回滚
        return res
    except Exception, e:
        logging.error("sql is empty or error %s" % e)
def close():
    """
    关闭MySQL连接
    :return: None
    """
    global conn
    conn.close()
request.py
#!/usr/bin/python
#-*- coding: UTF-8 -*-
# 基础包:接口测试的封装
import requests
import core.log as log
import json
logging = log.get_logger()
def change_type(value):
    """
    对dict类型进行中文识别
    :param value: 传的数据值
    :return: 转码后的值
    """
    try:
        if isinstance(eval(value), str):
            return value
        if isinstance(eval(value), dict):
            result = eval(json.dumps(value))
            return result
    except Exception, e:
        logging.error("类型问题 %s", e)
def api(method, url, data ,headers):
    """
    自定义一个接口测试的方法
    :param method: 请求类型
    :param url: 地址
    :param data: 数据
    :param headers: 请求头
    :return: success(str)
    """
    global results
    try:
        if method == ("post" or "POST"):
            results = requests.post(url, data, headers=headers,cookies=cookie)
        if method == ("get" or "GET"):
            results = requests.get(url, data, headers=headers,cookies=cookie)
        response = results.json()
        success = response.get("success")
        return success
    except Exception, e:
        logging.error("service is error", e)
def set_cookieApi(method, url, data ,headers):
    """
    自定义一个登录接口测试的方法
    :param method: 请求类型
    :param url: 地址
    :param data: 数据
    :param headers: 请求头
    :return: success(bool)
    """
    global cookie
    try:
        if method == ("post" or "POST"):
            results = requests.post(url, data, headers=headers)
        if method == ("get" or "GET"):
            results = requests.get(url, data, headers=headers)
        response = results.json()
        success = str(response.get("success"))
        cookie = requests.utils.dict_from_cookiejar(results.cookies)
        return success
    except Exception, e:
        logging.info("LoginApi请求失败", e)
func.py: 可以通过接口返回数据与自己写的sql查询的数据进行对比,判断用例是否通过,所以提供了mysql通用模块
#!/usr/bin/python
# -*- coding: UTF-8 -*-
# 业务包:通用函数
import core.mysql as mysql
import core.log as log
import core.request as request
import core.config as conf
import constants as cs
import os
logging = log.get_logger()
class ApiTest:
    """接口测试业务类"""
    def __init__(self):
        pass
    def prepare_data(self, host, user, password, db, sql):
        """
        数据准备,添加测试数据
        :param host: 服务地址
        :param user: 用户
        :param password: 密码
        :param db: 数据库名
        :param sql: 执行的SQL
        :return:
        """
        mysql.connect(host, user, password, db)
        res = mysql.execute(sql)
        mysql.close()
        logging.info("Run sql: the row number affected is %s" % res)
        return res
    def get_prepare_sql(self, filename, key):
        """
        获取预备执行的SQL
        :param title: 配置文件头信息
        :param key: 配置文件值
        :return: Value
        """
        try:
            conf.get_config(filename)
            value = conf.get_data(title=cs.TITLE, key=key)
            return value
        except Exception, e:
            logging.error("获取用例参数值失败 %s" % e)
    def new_report_menu(self, filename):
        """
        这个方法主要是通过写入文件的方法,先打开cs.YML_REPORT也就是
        mkdocs.yml文件,判断文件中是否存在当前写入的内容。
        :param filename: 测试用例文件
        :return: 测试报告内容
        """
        try:
            result = os.path.exists(cs.REPORT_PATH)
            if result == True:
                conf.get_config(filename)
                reportName = eval(conf.get_data(title=cs.REPORT_NAME, key=cs.REPORT))
                report_name = eval(conf.get_data(title=cs.REPORT_NAME, key=cs.R_NAME))
                file = open(cs.YML_REPORT, ‘r‘)
                list_con = file.readlines()
                content = str(list_con).decode("string_escape")
                fileContent = "- [%s, %s]"
                row = "\n"
                _content = fileContent % (reportName + cs.NOW, report_name)
                con = row + _content
                if _content not in content:
                    f = open(cs.YML_REPORT, ‘a+‘)
                    f.write(con)
                else:
                    logging.info("内容已经存在 %s" % _content)
        except Exception, e:
            logging.error("文件路径不存在 %s", e)
    def write_report(self, content):
        """
        这个方法用于书写测试报告从而解决之前的通过
        logging方式写入导致其他的日志无法实现写入
        :param content: 传入文件的内容
        :return: None
        """
        reportName = eval(conf.get_data(title=cs.REPORT_NAME, key=cs.REPORT))
        _reportName = reportName + cs.NOW
        filename = cs.REPORT_PATH + _reportName
        try:
            file = open(filename, ‘a+‘)
            file.writelines(content)
        except Exception, e:
            logging.error("文件路径不存在 %s", e)
    def execute_case(self, filename):
        """
        执行接口测试用例的方法
        :param filename: 用例文件名称
        :return: 测试结果
        """
        conf.get_config(filename)
        list = eval(conf.get_title_list())
        for i in range(1, len(list)):
            title = list[i]
            number = conf.get_data(title, key=cs.NUMBER)
            name = conf.get_data(title, key=cs.NAME)
            method = conf.get_data(title, key=cs.METHOD)
            url = conf.get_data(title, key=cs.URL)
            data = conf.get_data(title, key=cs.DATA)
            _data = request.json.dumps(data,ensure_ascii=False,indent=4)
            headers = eval(conf.get_data(title, key=cs.HEADERS))
            # headers[‘Cookie‘]=cookie
            _headers = request.json.dumps(headers,ensure_ascii=False,indent=4)
            testUrl = cs.DOMAIN + url
            login=cs.LOGIN
            if(title==login):
                actualCode = request.set_cookieApi(method, testUrl, data, headers)
            else:
                actualCode = str(request.api(method, testUrl, data, headers))
            expectCode = str(conf.get_data(title, key=cs.CODE))
            if actualCode != expectCode:
                logging.info("新增一条接口失败报告")
                self.write_report(cs.API_TEST_FAIL % (name, number, method, testUrl, headers,data, expectCode, actualCode))
            else:
                logging.info("新增一条接口成功报告")
                self.write_report(cs.API_TEST_SUCCESS % (name, number, method, testUrl, headers,data, expectCode, actualCode))
    def run_test(self, filename):
        """
        普通接口测试类方法
        :param filename: 接口的用例name
        :return: 测试报告
        """
        reportName =eval( conf.get_data(title=cs.REPORT_NAME, key=cs.REPORT))
        _filename = cs.REPORT_PATH + reportName + cs.NOW
        try:
            if os.path.exists(_filename):
                os.remove(_filename)
                self.execute_case(filename)
            else:
                self.execute_case(filename)
        except Exception, e:
            logging.error("执行接口测试失败 %s", e)
    def write_report_result(self):
        """
        这个方法用于书写测试报告结果
        :return: None
        """
        reportName = eval(conf.get_data(title=cs.REPORT_NAME, key=cs.REPORT))
        _filename = cs.REPORT_PATH + reportName + cs.NOW
        try:
            f = file(_filename)
            content = f.read()
            if content != None:
                _count = content.count("Number")
                _fail = content.count("Case Fail")
                _pass = content.count("Case Pass")
                space = content.split(‘\n‘)
                space.insert(0,cs.RESULT_CONTENT % (_count, _pass, _fail))
                _content_ = ‘\n‘.join(space)
                fp = file(_filename,‘r+‘)
                fp.write(_content_)
        except Exception, e:
            logging.error("文件路径不存在 %s", e)
run.py
# !/usr/bin/python
# -*- coding: UTF-8 -*-
# 执行包:runscript
import function.func as func
ApiTest = func.ApiTest()
FILENAME = ‘login.ini‘
"""1.新建测试报告目录"""
ApiTest.new_report_menu(filename=func.cs.CASE_PATH+FILENAME)
"""2.执行测试用例"""
ApiTest.run_test(filename=func.cs.CASE_PATH+FILENAME)
"""3.统计测试报告结果"""
ApiTest.write_report_result()
配置文件:login.ini
[Test Report]
report = ‘Enterprise Version‘
reportName = ‘Regression Testing Report‘
[login]
number = 1
name = login
method = post
url = /s1/web/login
data = {data:{‘userCode‘:‘admin‘,‘password‘:‘til@5158‘}}
headers = {‘Content-Type‘: ‘application/json;charset=UTF-8;‘}
code = True
[brand]
number = 2
name = get_brand
method = post
url = /s1/brand/getList
data = {"pageSize":10,"pageNum":1}
headers = {‘Content-Type‘: ‘application/json; charset=UTF-8;‘}
code = True
上一篇:windows cmd 命令大全