API接口自动化测试 – 新手搭建思路分享(Pytest)

一、背景介绍

  • 由于被测系统的复杂度不断上升,导致传统的测试方法成本上升且测试效率大幅下降;因此引入自动化测试是必然趋势。
    鉴于接口测试相对于UI测试更加稳定,且相对容易自动化持续集成;故以接口方式作为自动化项目切入,主要目的如下:
  • 1、用于回归测试,解决手工回归测试带来的繁琐且重复工作;
  • 2、线上或线下巡检测试,结合jenkins部署持续集成,及时发现运行环境存在的问题;
  • 3、提升团队自动化测试能力,为系统业务提供强有力的测试手段;

二、构思设计及主要实现点

2.2)、需实现的功要点:

1、 可持续集成采用方式:构建部署 gitlab + jenkins+ docker 方式
2、 自动化执行-触发点多元化:
2.1)、API接口方式:通过接口字段传参方式,触发自动化测试用例执行;
2.2)、jenkins构建部署方式:通过被测对象构建时,流水线触发自动化测试用例执行;
2.3)、命令行方式:在部署的docke容器内或者本地开发调试时,通过命令行方式触发执行;
3、 编程代码与测试数据分离:自动化服务部署后,因测试用例数据仍是高频操作模块,所以代码和测试数据是需要做分离的,整块测试数据以yaml方式设计管理;
4、 高频的用例数据及文件处理方式:
4.1)、可通过前端页面交互方式,支持对测试用例文件上传、下载操作;
4.2)、可通过API接口方式对用例数据进行增删改查(即编辑数据无需进入docker容器内修改,更加便捷);
5、 缓存机制处理:主要对于账号登录信息(如token)、被测接口有依赖的前置数据、流程用例方面依赖接口的数据等,需要做缓存机制处理;
6、 测试报告功能:为了便于项目所有人查看,测试报告需支持在线链接查看(此块引用Allure服务);
说明:此块在线 Allure server是引用GitHub上的大佬(kochetkovma),有需要可直接去搜索下
7、 日志记录留痕:基本的日志记录信息,这是常规必备点。

2.2)、接口自动化测试-架构图:

  • 其实看这张图就足够了,表达的思路都是这里,也画了我差不多一天时间,吐血~

    接口自动化架构图.png

三、直接看最后的成果(以触发点1和2为例):

1、触发点1:通过接口请求触发(引用postman工具):

  • 步骤1、调用触发执行接口:传入执行命令(以pytest命令为基础);如图“-m smoke_flow_one -s”只执行指定标签的用例;
  • 步骤2、执行接口响应:接口返回本次执行用例的状态(msg字段:通过或不通过)以及测试报告链接(data字段);
  • 步骤3、测试报告:打开链接可查看到本次执行用例的详细情况;

    步骤1、postman接口调用.png

    步骤2、通过接口返回链接打开测试报告.png

    步骤3、查看本次测试执行情况

2、触发点2:被测对象jar包更新-触发执行(jenkins):

  • 步骤1、jenkins流水线配置:被测服务在流水线配置中添加触发自动化测试节点,在被测对象部署完毕后会进入自动化执行环节;
  • 步骤2、流水线节点:根据自动化测试节点判断本次构建是否测试通过,且可通过jenkins logs内的链接查看完整测试报告;
  • 步骤3、测试报告:打开链接查看本次执行用例的详细情况;

    步骤1、jenkins流水线增加触发自动化节点

    jenkins logs获取链接查看报告

四、高频操作的测试用例数据及文件处理:

4.1、数据操作:通过接口调用修改用例数据-->>增删改查(如下以“查”和“改”为示例):
  • 步骤1、根据传入的类型字段及指定文件,可查询用例数据:

    1、查询-指定用例数据
  • 步骤2、根据传入的字段参数,可修改对应的用例数据:

    2、修改-指定用例数据
4.2、文件操作:以页面形式对用例文件进行上传/下载操作(水平有限,只能搞简陋版的html页面了):
  • 步骤1、对文件下载:通过html页面进入各目录,可自定义选择文件下载;

    用例文件上传.png
  • 步骤2、对文件上传:对已修改的用例文件或者需要新增文件,可上传到自定义指定目录;

    用例文件下载.png

四、自动化项目结构-示例( 因琐碎点太多,只抽取主流点作为说明)

  • 作为一个测试编程新手,比不了专业的开发人员;各位大佬,如有优化或其他构思思路欢迎指点评论,我再调整优化;

4.1、项目目录结构说明:

代码目录结构

4.2、入口路由方面(以触发1为例)-->示例:

1)、入口(Flask框架):摘取server.py一小段接口样例

'''摘取server.py其中一个接口'''
app = Flask(__name__)
@app.route("/autotest/runcase", methods=['POST'])
def run_auto_test():
    """运行自动化:return:"""
    try:
        command = request.json.get("command").strip()
    except AttributeError:
        return jsonify({"code": -1, 'msg': '缺少必要参数:command'})
    # executor.submit(__run_jobs, run_command)
    re_data, re_start = run_py(['0', command])
    re_msg = '通过' if re_start == 0 else '不通过'
    return jsonify({"code": re_start, "msg": "自动化测试:" + re_msg, "data": re_data})

'''摘取runner.py中pytest命令处理模块'''
def run_py(cmdsystem):
    case_config = Case_Config()
    now_time = time.strftime('%Y%m%d-%H%M%S')
    report_data = 'TestReport/report_data/{}/'.format(now_time)
    report_html = 'TestReport/html/{}/'.format(now_time)
    run_sys = 'pytest {} --alluredir={}  --clean-alluredir'.format('Testsuites/TestCase/' + str(cmdsystem[1]),
                                                                   report_data)
    sys_start = os.system(run_sys)
    re_data = '1、执行命令:run_sys=' + run_sys
    try:
        if cmdsystem[2] in ('debug', 'false', False):
            run_allure = '{} generate {} -o {} --clean'.format(r'allure-2.17.0/bin/allure', report_data, report_html)
            re_data = re_data + '/n2、执行命令:run_allure={};'.format(run_allure)
            os.system(run_allure)
    except IndexError:
        ip_port = case_config.yml_cfdata()['allure_ip_port']
        allure_report_url = Allure_Server(ip_port).auto_report(report_data, now_time)
        re_data = {'报告链接地址': allure_report_url, "执行命令": run_sys}
    test_result = case_config.yml_cfdata('Testsuites/TestData/com_data/test_result.yml','test_result')
    logger.info('***用例自动化测试结果:' + test_result)
    re_start_code = 0 if test_result == 'pass' and sys_start == 0 else -1
    logger.info(re_data)
    return re_data, re_start_code

2)、摘取其中一段test_x.py测试用例代码-->示例:

@pytest.mark.run(order=131)
@allure.epic("Web-后端管理页面")
@allure.feature("案例事务操作模块")
@allure.story("推送案例")
class Test_ip_port_case:
    @classmethod
    def setup_class(cls):  # 类初始化
        cls.Case_Config = Case_Config(casedata_path)
        cls.case_config = cls.Case_Config.yml_cfdata()
        cls.regular_data = cls.Case_Config.regular_data
        cls.case_ymldata = cls.Case_Config.yml_data()
        cls.local_var = cls.case_ymldata['local_var']
        cls.regist_no = cls.local_var['regist_no']
        cls.headers = {**{'Authorization': 'Bearer ' + cls.Case_Config.cachetoken()}, **cls.local_var['headers']}
        cls.case_run = Case_Run(cls.local_var['api_name'], cls.case_config['pgsql_zds'], headers=cls.headers,api_host=cls.local_var['ip_port']).case_run
    def setup_method(self):  # 方法初始化
        print('/n === 测试案件前置动作 ===')
        run_sql = "delete from case_info where regist_no='{regist_no}';delete from task_info where regist_no='{regist_no}';delete from yxt_survey_information where regist_no='{regist_no}';".format(
            regist_no=self.regist_no)
        DB_Connect(self.case_config['pgsql_zds']).pgsqldb(run_sql, 'delete')

    @pytest.mark.smoke_flow_four
    @pytest.mark.smoke_main
    def test_ip_port_case_1(self, set_global_data):
        case_data = self.case_ymldata['case_001']
        request_data = case_data['request_data']['body']
        expect_data = case_data['expect_data']['response_data']
        status_code = case_data['expect_data']['status_code']
        sql_check = self.regular_data(case_data['other_data']['sql_check'], {'regist_no': self.regist_no})
        self.case_run(case_data['case_name'], status_code=status_code, request_data=request_data, expect_data=expect_data,sql_check=sql_check)

    @pytest.mark.parametrize('case_code',['case_002','case_003','case_004'])#数据驱动,执行多条相同验证场景用例
    def test_ip_port_case_2_4(self, set_global_data,case_code):
        case_data = self.case_ymldata[case_code]
        request_data = case_data['request_data']['body']
        expect_data = case_data['expect_data']['response_data']
        status_code = case_data['expect_data']['status_code']
        self.case_run(case_data['case_name'], status_code=status_code, request_data=request_data, expect_data=expect_data)

4.3、yaml文件用例/配置数据-->>示例:

  • 3.1)、测试用例数据:
local_var:
  remarks: 案例信息模块-更改案例状态
  headers: &headers { "Content-Type": "application/json" }
  api_name: &api_name v2_caseTasks_var
  cache_data: &cache_data { 'path': 'user_cache/users_current.yml','key': 'users_current' }

case_001:
  case_name: 无法联系客户
  other_data: { 'sql_check': { 'sql_node': [ "select status from case_info where regist_no='&{regist_no}&' and is_deleted='f'" ],  'actual_node': [ 'expect_data' ] } }
  request_data: { "body": [ { "taskType": 4,"customized": false,"exchangeReasonId": 2 } ],"headers": *headers }
  expect_data: { 'status_code': 200,'response_data': [ { "status": 50 } ] }
  remarks: 无
case_002:
  case_name: 客户不同意使用
  other_data: { 'sql_check': { 'sql_node': [ "select status from case_info where regist_no='&{regist_no}&' and is_deleted='f'" ],  'actual_node': [ 'expect_data' ] } }
  request_data: { "body": [ { "taskType": 4,"customized": false,"exchangeReasonId": 3 } ],"headers": *headers }
  expect_data: { 'status_code': 200,'response_data':{"msg":"任务提交成功"} }
  remarks: 无
  • 3.2)、测试接口配置文件:
local_var: &local_var
  url: &url /api/api/
  headers: &headers { 'Content-Type': 'application/json' }

case_mine: { 'name': '个人会话列表接口','method': 'get','headers': *headers,'url': '/api/v2/case/mine','data': { } }
case_send_sms: { 'name': '短信发送(变更手机号码)','method': 'post','headers': *headers,'url': '/api/case/send-sms','data': { } }
case_remote_var: { 'name': '获取案例信息', 'method': 'get', 'headers': *headers, 'url': '/api/case/remote', 'data': { } }
cases_all: { 'name': '会话历史查询接口','method': 'get','headers': *headers,'url': '/api/cases/all','data': { } }

4.4、测试报告处理逻辑-->>示例:

import requests
import time, json
import zipfile, os
from Testsuites.TestModel import Case_Config
class Allure_Server(object):
    """调用allure-server服务生成报告,并返回链接地址"""

    def __init__(self, ip_port):
        self.result_url = 'http://' + ip_port + '/api/result'
        self.report_url = 'http://' + ip_port + '/api/report'
        self.test_result = 'Testsuites/TestData/com_data/test_result.yml'

    def zip_file(self, src_dir, zip_name='allure.zip'):
        z = zipfile.ZipFile(src_dir + zip_name, 'w', zipfile.ZIP_DEFLATED)
        for dirpath, dirnames, filenames in os.walk(src_dir):
            fpath = dirpath.replace(src_dir, '')
            fpath = fpath and fpath + os.sep or ''
            for filename in filenames:
                if filename != zip_name:
                    z.write(os.path.join(dirpath, filename), fpath + filename)
        z.close()
        return zip_name

    def api_result(self, file_name):
        file_zip = open(file_name, 'rb')
        files = {'allureResults': (file_name, file_zip, 'application/x-zip-compressed')}
        re_data = requests.post(self.result_url, files=files)
        return json.loads(re_data.text)

    def api_report(self, case_uuid, date_time=None, now_time=None):
        date_time = time.strftime('%Y%m%d') if date_time == None else date_time
        now_time = str(int(time.time() * 1000)) if now_time == None else now_time
        json_data = {"reportSpec": {"path": [date_time, now_time], "executorInfo": {"buildName": now_time}},
                     "results": [case_uuid], "deleteResults": False}
        re_data = requests.post(self.report_url, json=json_data, headers={"Content-Type": "application/json"})
        return json.loads(re_data.text)

    def auto_report(self, file_path, *time_data):
        zip_name = self.zip_file(file_path)
        result_uuid = self.api_result(file_path + zip_name)['uuid']
        time.sleep(2)
        report_data = self.api_report(result_uuid, *time_data)['latest']
        Case_Config().yml_cache('report_link', report_data, no_cache=self.test_result)
        return report_data

五、日志模块方面:

class Log(object):
    level_relations = { 'debug': logging.DEBUG, 'info': logging.INFO, 'warning': logging.WARNING, 'error': logging.ERROR, 'crit': logging.CRITICAL}  # 日志级别关系映射
    def __init__(self, filename, level='info', when='D', backCount=3,
                 fmt='%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s'):
        self.logger = logging.getLogger(filename)
        format_str = logging.Formatter(fmt)  # 设置日志格式
        self.logger.setLevel(self.level_relations.get(level))  # 设置日志级别
        sh = logging.StreamHandler()  # 往屏幕上输出
        sh.setFormatter(format_str)  # 设置屏幕上显示的格式
        th = handlers.TimedRotatingFileHandler(filename=filename, when=when, backupCount=backCount,
                                               encoding='utf-8')  # 往文件里写入#指定间隔时间自动生成文件的处理器
        # 实例化TimedRotatingFileHandler
        # interval是时间间隔,backupCount是备份文件的个数,如果超过这个个数,就会自动删除,when是间隔的时间单位,单位有以下几种:
        # S 秒/M 分/H 小时/D 天/W 每星期(interval==0时代表星期一)/midnight 每天凌晨
        th.setFormatter(format_str)  # 设置文件里写入的格式
        self.logger.addHandler(sh)  # 把对象加到logger里
        self.logger.addHandler(th)
def create_file():
    filepath = os.path.realpath(__file__)
    output_dir = os.path.abspath(os.path.join(filepath, "../../logs"))
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    log_name = '{}.log'.format(time.strftime('%Y-%m-%d'))
    filename = os.path.join(output_dir, log_name)
    return filename
filename = create_file()
log = Log(filename, level='debug')
logger = log.logger

**备注:其他细节就不摘取示例了,内容太多。主要目的是表达搭建的构思以及看下示例,有个初步的概念;

六、结束收尾语:

  • 1、自动化测试项目写写停停花了两个多月,虽还有很多需要完善和优化的,但都已完成原始架构的设计;好歹算是个成品了。
  • 2、目前自动化项目已经在实际系统上使用,整体效果还不错,基本上能满足常规的回归测试场景,提升测试效率,非常nice ~
  • 3、各位测试伙伴们,新手测试的第一次构思搭建这自动化项目,不妥之处望给与指点纠正。
  • 4、还有动动小手,点点赞撒;

版权声明:
作者:siwei
链接:https://www.techfm.club/p/46746.html
来源:TechFM
文章版权归作者所有,未经允许请勿转载。

THE END
分享
二维码
< <上一篇
下一篇>>