API 自动化框架

2021-06-08 23:06

阅读:445

标签:post   ica   挂载   except   requests   全局变量   文件头   cap   docker   

API 自动化框架

  个人认为接口自动化测试使用python语言编写更加简单,但所有接口自动化项目代码的思维都是一样的

1.项目包结构

技术图片

1.case:存放用例数据的包,将所有用例数据以配置文件形式传入

2.core:核心包

  1)config.py:封装ConfigParser解析获取配置文件数据的方法

    python的内置模块ConfigParser:不太了解的可以百度

  2)log.py:封装log的模块

  3)request.py:封装接口测试的方法

  4)mysql.py:封装连接数据库的方法

3.function:功能包,封装执行用例的方法,以及生成测试报告的方法

  生成测试报告的原理:使用file生成.md文件,使用docker,pull mkdocs相关镜像,将data volum挂载到搭载测试报告的宿主机上,访问对应的域名即可

4.report:存放所有生成的测试报告

5.constant.py:存放所有全局变量的模块

6.run.py:运行测试用例,以及生成测试报告的模块

2.核心代码

 config.py

#!/usr/bin/python
# -*- coding: UTF-8 -*-
# 基础包:配置服务

import ConfigParser
import core.log as log

config = ConfigParser.ConfigParser()
logging = log.get_logger()

def get_config(filename):
    """
    获取文件配置
    :param filename: 配置文件名
    :return: None
    """
    global config
    try:
        config.read(filename)
        return True
    except Exception, e:
        logging.error("读取配置失败 %s" % e)


def get_data(title, key):
    """
    参数配置
    :param title: 配置文件的头信息
    :param key: 配置文件的key值
    :return: 配置文件的value
    """
    try:
        value = config.get(title, key)
        type(value)
        return value
    except Exception, e:
        logging.error("获取配置文件参数失败 %s" % e)


def get_title_list():
    """
    获取所有title
    :return: title list
    """
    try:
        title = config.sections()
        return str(title).decode("string_escape")
        # return ‘\n‘.join(title)
    except Exception, e:
        logging.error("获取title信息失败 %s", e)

  

mysql.py

#!/usr/bin/python
# -*- coding: UTF-8 -*-
# author: zhizhi
# 基础包: MySQL

import pymysql.cursors
import core.log as log


logging = log.get_logger()
conn = None

def connect(host, user, password, db, charset=utf8):
    """
    链接Mysql
    :param host: 地址
    :param user: 用户
    :param password: 密码
    :param db: 数据库名
    :param charset: 数据类型
    :return: 链接
    """
    global conn
    if conn == None:
        conn = pymysql.connect(host=host,
                               user=user,
                               password=password,
                               db=db,
                               charset=charset,
                               cursorclass=pymysql.cursors.DictCursor)
    return conn


def execute(sql):
    """
    执行SQL
    :param sql: 执行的SQL
    :return: 影响行数
    """
    global conn
    try:
        with conn.cursor() as cursor:
            res = cursor.execute(sql)
        conn.commit()
        # 这里一定要写commit 不然提交的sql 都会被事务回滚
        return res
    except Exception, e:
        logging.error("sql is empty or error %s" % e)


def close():
    """
    关闭MySQL连接
    :return: None
    """
    global conn
    conn.close()

 

request.py

#!/usr/bin/python
#-*- coding: UTF-8 -*-
# 基础包:接口测试的封装

import requests
import core.log as log
import json



logging = log.get_logger()

def change_type(value):
    """
    对dict类型进行中文识别
    :param value: 传的数据值
    :return: 转码后的值
    """
    try:
        if isinstance(eval(value), str):
            return value
        if isinstance(eval(value), dict):
            result = eval(json.dumps(value))
            return result
    except Exception, e:
        logging.error("类型问题 %s", e)


def api(method, url, data ,headers):
    """
    自定义一个接口测试的方法
    :param method: 请求类型
    :param url: 地址
    :param data: 数据
    :param headers: 请求头
    :return: success(str)
    """
    global results
    try:
        if method == ("post" or "POST"):
            results = requests.post(url, data, headers=headers,cookies=cookie)
        if method == ("get" or "GET"):
            results = requests.get(url, data, headers=headers,cookies=cookie)

        response = results.json()
        success = response.get("success")
        return success
    except Exception, e:
        logging.error("service is error", e)

def set_cookieApi(method, url, data ,headers):
    """
    自定义一个登录接口测试的方法
    :param method: 请求类型
    :param url: 地址
    :param data: 数据
    :param headers: 请求头
    :return: success(bool)
    """
    global cookie
    try:
        if method == ("post" or "POST"):
            results = requests.post(url, data, headers=headers)
        if method == ("get" or "GET"):
            results = requests.get(url, data, headers=headers)
        response = results.json()
        success = str(response.get("success"))
        cookie = requests.utils.dict_from_cookiejar(results.cookies)
        return success
    except Exception, e:
        logging.info("LoginApi请求失败", e)

 

func.py: 可以通过接口返回数据与自己写的sql查询的数据进行对比,判断用例是否通过,所以提供了mysql通用模块

#!/usr/bin/python
# -*- coding: UTF-8 -*-
# 业务包:通用函数


import core.mysql as mysql
import core.log as log
import core.request as request
import core.config as conf
import constants as cs
import os


logging = log.get_logger()


class ApiTest:
    """接口测试业务类"""

    def __init__(self):
        pass

    def prepare_data(self, host, user, password, db, sql):
        """
        数据准备,添加测试数据
        :param host: 服务地址
        :param user: 用户
        :param password: 密码
        :param db: 数据库名
        :param sql: 执行的SQL
        :return:
        """
        mysql.connect(host, user, password, db)
        res = mysql.execute(sql)
        mysql.close()
        logging.info("Run sql: the row number affected is %s" % res)
        return res

    def get_prepare_sql(self, filename, key):
        """
        获取预备执行的SQL
        :param title: 配置文件头信息
        :param key: 配置文件值
        :return: Value
        """
        try:
            conf.get_config(filename)
            value = conf.get_data(title=cs.TITLE, key=key)
            return value
        except Exception, e:
            logging.error("获取用例参数值失败 %s" % e)

    def new_report_menu(self, filename):
        """
        这个方法主要是通过写入文件的方法,先打开cs.YML_REPORT也就是
        mkdocs.yml文件,判断文件中是否存在当前写入的内容。
        :param filename: 测试用例文件
        :return: 测试报告内容
        """
        try:
            result = os.path.exists(cs.REPORT_PATH)
            if result == True:
                conf.get_config(filename)
                reportName = eval(conf.get_data(title=cs.REPORT_NAME, key=cs.REPORT))
                report_name = eval(conf.get_data(title=cs.REPORT_NAME, key=cs.R_NAME))
                file = open(cs.YML_REPORT, r)
                list_con = file.readlines()
                content = str(list_con).decode("string_escape")
                fileContent = "- [%s, %s]"
                row = "\n"
                _content = fileContent % (reportName + cs.NOW, report_name)
                con = row + _content

                if _content not in content:
                    f = open(cs.YML_REPORT, a+)
                    f.write(con)
                else:
                    logging.info("内容已经存在 %s" % _content)
        except Exception, e:
            logging.error("文件路径不存在 %s", e)

    def write_report(self, content):
        """
        这个方法用于书写测试报告从而解决之前的通过
        logging方式写入导致其他的日志无法实现写入
        :param content: 传入文件的内容
        :return: None
        """
        reportName = eval(conf.get_data(title=cs.REPORT_NAME, key=cs.REPORT))
        _reportName = reportName + cs.NOW
        filename = cs.REPORT_PATH + _reportName
        try:
            file = open(filename, a+)
            file.writelines(content)
        except Exception, e:
            logging.error("文件路径不存在 %s", e)



    def execute_case(self, filename):
        """
        执行接口测试用例的方法
        :param filename: 用例文件名称
        :return: 测试结果
        """
        conf.get_config(filename)
        list = eval(conf.get_title_list())

        for i in range(1, len(list)):
            title = list[i]

            number = conf.get_data(title, key=cs.NUMBER)

            name = conf.get_data(title, key=cs.NAME)
            method = conf.get_data(title, key=cs.METHOD)
            url = conf.get_data(title, key=cs.URL)
            data = conf.get_data(title, key=cs.DATA)
            _data = request.json.dumps(data,ensure_ascii=False,indent=4)
            headers = eval(conf.get_data(title, key=cs.HEADERS))
            # headers[‘Cookie‘]=cookie
            _headers = request.json.dumps(headers,ensure_ascii=False,indent=4)

            testUrl = cs.DOMAIN + url
            login=cs.LOGIN
            if(title==login):
                actualCode = request.set_cookieApi(method, testUrl, data, headers)
            else:
                actualCode = str(request.api(method, testUrl, data, headers))

            expectCode = str(conf.get_data(title, key=cs.CODE))


            if actualCode != expectCode:
                logging.info("新增一条接口失败报告")
                self.write_report(cs.API_TEST_FAIL % (name, number, method, testUrl, headers,data, expectCode, actualCode))
            else:
                logging.info("新增一条接口成功报告")
                self.write_report(cs.API_TEST_SUCCESS % (name, number, method, testUrl, headers,data, expectCode, actualCode))

    def run_test(self, filename):
        """
        普通接口测试类方法
        :param filename: 接口的用例name
        :return: 测试报告
        """
        reportName =eval( conf.get_data(title=cs.REPORT_NAME, key=cs.REPORT))

        _filename = cs.REPORT_PATH + reportName + cs.NOW
        try:
            if os.path.exists(_filename):
                os.remove(_filename)
                self.execute_case(filename)
            else:
                self.execute_case(filename)
        except Exception, e:
            logging.error("执行接口测试失败 %s", e)

    def write_report_result(self):
        """
        这个方法用于书写测试报告结果
        :return: None
        """
        reportName = eval(conf.get_data(title=cs.REPORT_NAME, key=cs.REPORT))
        _filename = cs.REPORT_PATH + reportName + cs.NOW
        try:
            f = file(_filename)
            content = f.read()
            if content != None:
                _count = content.count("Number")
                _fail = content.count("Case Fail")
                _pass = content.count("Case Pass")
                space = content.split(\n)
                space.insert(0,cs.RESULT_CONTENT % (_count, _pass, _fail))
                _content_ = \n.join(space)
                fp = file(_filename,r+)
                fp.write(_content_)
        except Exception, e:
            logging.error("文件路径不存在 %s", e)

 

run.py

# !/usr/bin/python
# -*- coding: UTF-8 -*-
# 执行包:runscript

import function.func as func

ApiTest = func.ApiTest()

FILENAME = login.ini

"""1.新建测试报告目录"""
ApiTest.new_report_menu(filename=func.cs.CASE_PATH+FILENAME)

"""2.执行测试用例"""
ApiTest.run_test(filename=func.cs.CASE_PATH+FILENAME)

"""3.统计测试报告结果"""
ApiTest.write_report_result()

 

配置文件:login.ini

[Test Report]
report = Enterprise Version
reportName = Regression Testing Report

[login]
number = 1
name = login
method = post
url = /s1/web/login
data = {data:{userCode:admin,password:til@5158}}
headers = {Content-Type: application/json;charset=UTF-8;}
code = True

[brand]
number = 2
name = get_brand
method = post
url = /s1/brand/getList
data = {"pageSize":10,"pageNum":1}
headers = {Content-Type: application/json; charset=UTF-8;}
code = True

 

 

 

API 自动化框架

标签:post   ica   挂载   except   requests   全局变量   文件头   cap   docker   

原文地址:https://www.cnblogs.com/shadow-yin/p/10678680.html


评论


亲,登录后才可以留言!