flask + Python3 实现的的API自动化测试平台---- IAPTest接口测试平台
上一篇 / 下一篇 2017-11-24 12:22:14 / 个人分类:python
**背景:1.平时测试接口,总是现写代码,对测试用例的管理,以及测试报告的管理持久化做的不够,
2.工作中移动端开发和后端开发总是不能并行进行,需要一个mock的依赖来让他们并行开发。
3.同时让自己锻炼去开发测试平台,掌握flask开发程序,提高自己的业务水平。
整体思路: 1.利用flask+bootstrap来进行web界面开发,对接口,接口测试用例,定时任务,测试报告的持续集成。
2.IAPTest支持接口用例管理,接口多用例测试,支持定时测试任务,测试报告持久化
3.目前mock服务支持单一path,定时任务可以开启暂停多用例执行,定时任务执行后自动发送测试报告,多用例的单次执行,单接口的调试功能。对测试环境的管理
下面来看下最后的效果图,以及附上github开源地址。
测试环境管理界面:
定时任务界面:
mock界面
测试报告界面
用例管理界面
接口管理界面
**核心代码分享区:**
定时任务对应视图开发
classAddtimingtaskView(MethodView): @login_requireddefget(self):returnrender_template('addtimingtasks.html') @login_requireddefpost(self): taskname=request.form['taskname'] tinmingtime=request.form['time'] to_email_data=request.form['to_email'] cao_email=request.form['cao_email'] weihu=request.form['weihu']iftaskname =='': flash('任务名不能为空!')returnrender_template('addtimingtasks.html')iftinmingtime =='': flash('任务执行时间不能为空!')returnrender_template('addtimingtasks.html')ifto_email_data=='': flash('发送给谁邮件不能为空!')returnrender_template('addtimingtasks.html')ifweihu=='': flash('维护人邮件不能为空!')returnrender_template('addtimingtasks.html') taskname_is= Task.query.filter_by(taskname=taskname).first()iftaskname_is: flash('任务已经存在请重新填写!')returnrender_template('addtimingtasks.html') new_task=Task(taskname=taskname,taskstart=tinmingtime,taskrepor_to=to_email_data,taskrepor_cao=cao_email,task_make_email=weihu, makeuser=current_user.id) db.session.add(new_task)try: db.session.commit() flash('添加定时任务成功')returnredirect(url_for('timingtask'))exceptException as e: db.session.rollback() flash('添加过程貌似异常艰难!')returnredirect(url_for('addtimingtasks'))returnrender_template('addtimingtasks.html')classEditmingtaskview(MethodView): @login_requireddefget(self,id): task_one=Task.query.filter_by(id=id).first() procjet=Project.query.all()ifnottask_one: flash('你编辑的不存在')returnredirect(url_for('timingtask'))returnrender_template('Edittimingtasks.html',task_one=task_one,porjects=procjet)defpost(self,id): task_one= Task.query.filter_by(id=id).first() procjet=Project.query.all() taskname= request.form['taskname'] tinmingtime= request.form['time'] to_email_data= request.form['to_email'] cao_email= request.form['cao_email'] weihu= request.form['weihu']iftaskname =='': flash('任务名不能为空!')returnrender_template('addtimingtasks.html')iftinmingtime =='': flash('任务执行时间不能为空!')returnrender_template('addtimingtasks.html')ifto_email_data=='': flash('发送给谁邮件不能为空!')returnrender_template('addtimingtasks.html')ifweihu=='': flash('维护人邮件不能为空!')returnrender_template('addtimingtasks.html') task_one.taskname=taskname task_one.taskrepor_to=to_email_data task_one.taskrepor_cao=cao_email task_one.task_make_email=weihu task_one.makeuser=current_user.idtry: db.session.commit() flash('编辑成功')returnredirect(url_for('timingtask'))except: db.session.rollback() flash('编辑出现问题!')returnredirect(url_for('timingtask'))returnrender_template('Edittimingtasks.html', task_one=task_one,porjects=procjet)classDeteleTaskViee(MethodView):defget(self,id): task_one= Task.query.filter_by(id=id).first()ifnottask_one: flash('你编辑的不存在')returnredirect(url_for('timingtask'))iftask_one.status==True: flash('已经删除')returnredirect(url_for('timingtask')) task_one.status=Truetry: db.session.commit() flash('删除任务成功')returnredirect(url_for('timingtask'))except: db.session.rollback() flash('删除任务休息了')returnredirect(url_for('timingtask')) @app.route('/gettest',methods=['POST']) @login_requireddefgettest():#ajax获取项目的测试用例projec=(request.get_data('project')).decode('utf-8')ifnotprojec:return[] proje=Project.query.filter_by(project_name=str(projec)).first()ifnotproje:return[] testyong=InterfaceTest.query.filter_by(projects_id=proje.id).all() testyong_list=[]foriintestyong: testyong_list.append({'name':i.Interface_name,'id':i.id})returnjsonify({'data':testyong_list})classTestforTaskView(MethodView):#为测试任务添加测试用例defget(self,id): procjet=Project.query.all() task_one=Task.query.filter_by(id=id).first()returnrender_template('addtestyongfortask.html',task_one=task_one,procjets=procjet)defpost(self,id): procjet=Project.query.all() task_one= Task.query.filter_by(id=id).first() proc_test=request.form.get('project')ifproc_test =='': flash(u'不能不添加测试项目!')returnrender_template('addtestyongfortask.html', task_one=task_one, procjets=procjet) test_yongli=request.form.getlist('testyongli')iftest_yongli=='': flash(u'亲你见过只有测试项目没有测试用例的测试任务吗!')returnrender_template('addtestyongfortask.html', task_one=task_one, procjets=procjet)foroldtaskintask_one.interface.all(): task_one.interface.remove(oldtask) task_one.prject=Project.query.filter_by(project_name=proc_test).first().idforyongliintest_yongli: task_one.interface.append(InterfaceTest.query.filter_by(id=yongli).first()) db.session.add(task_one)try: db.session.commit() flash('任务更新用例成功')returnredirect(url_for('timingtask'))except: flash('任务更新用例失败')returnredirect(url_for('timingtask'))returnrender_template('addtestyongfortask.html', task_one=task_one, procjets=procjet)classStartTaskView(MethodView):#开始定时任务@login_requireddefget(self,id): task=Task.query.filter_by(id=id).first()iflen(task.interface.all())<=1: flash('定时任务执行过程的测试用例为多用例,请你谅解')returnredirect(url_for('timingtask'))try: scheduler.add_job(func=addtask, id=str(id), args=str(id),trigger=eval(task.taskstart),replace_existing=True) task.yunxing_status='启动'db.session.commit() flash(u'定时任务启动成功!')returnredirect(url_for('timingtask'))exceptException as e: flash(u'定时任务启动失败!请检查任务的各项内容各项内容是否正常')returnredirect(url_for('timingtask'))classZantingtaskView(MethodView):#暂停定时任务@login_requireddefget(self,id): task= Task.query.filter_by(id=id).first()try: scheduler.pause_job(str(id)) task.yunxing_status='暂停'db.session.commit() flash(u'定时任务暂停成功!')returnredirect(url_for('timingtask'))except: task.yunxing_status='创建'db.session.commit() flash(u'定时任务暂停失败!已经为您初始化')returnredirect(url_for('timingtask'))classHuifutaskView(MethodView):#回复定时任务@login_requireddefget(self,id): task= Task.query.filter_by(id=id).first()try: scheduler.resume_job(str(id)) task.yunxing_status='启动'db.session.commit() flash(u'定时任务恢复成功!')returnredirect(url_for('timingtask'))except: task.yunxing_status='创建'db.session.commit() flash(u'定时任务恢复失败!已经为您初始化')returnredirect(url_for('timingtask'))classYichuTaskView(MethodView):#移除定时任务@login_requireddefget(self,id): task= Task.query.filter_by(id=id).first()try: scheduler.delete_job(str(id)) task.yunxing_status='关闭'db.session.commit() flash(u'定时任务移除成功!')returnredirect(url_for('timingtask'))except: task.yunxing_status='创建'db.session.commit() flash(u'定时任务移除失败!已经为您初始化')returnredirect(url_for('timingtask'))
定时任务所执行的func代码
defaddtask(id):#定时任务执行的时候所用的函数in_id=int(id) task=Task.query.filter_by(id=in_id).first() starttime=datetime.datetime.now() star=time.time() day= time.strftime("%Y%m%d%H%M", time.localtime(time.time())) basedir= os.path.abspath(os.path.dirname(__file__)) file_dir= os.path.join(basedir,'upload') file= os.path.join(file_dir, (day +'.log'))ifos.path.exists(file)isFalse: os.system('touch %s'%file) filepath= os.path.join(file_dir, (day +'.html'))ifos.path.exists(filepath)isFalse: os.system(r'touch %s'%filepath) projecct_list=[] model_list=[] Interface_name_list=[] Interface_url_list=[] Interface_meth_list=[] Interface_pase_list=[] Interface_assert_list=[] Interface_headers_list=[] id_list=[]fortask_yongliintask.interface.all(): id_list.append(task_yongli.id) projecct_list.append(task_yongli.projects) model_list.append(task_yongli.models) Interface_url_list.append(task_yongli.Interface_url) Interface_name_list.append(task_yongli.Interface_name) Interface_meth_list.append(task_yongli.Interface_meth) Interface_pase_list.append(task_yongli.Interface_pase) Interface_assert_list.append(task_yongli.Interface_assert) Interface_headers_list.append(task_yongli.Interface_headers) apitest=ApiTestCase(Interface_url_list, Interface_meth_list, Interface_pase_list, Interface_assert_list, file, Interface_headers_list) result_toal, result_pass, result_fail, relusts, bask_list=apitest.testapi() endtime=datetime.datetime.now() end=time.time() createHtml(titles=u'接口测试报告', filepath=filepath, starttime=starttime, endtime=endtime, passge=result_pass, fail=result_fail, id=id_list, name=projecct_list, headers=Interface_headers_list, coneent=Interface_url_list, url=Interface_meth_list, meth=Interface_pase_list, yuqi=Interface_assert_list, json=bask_list, relusts=relusts) hour= end -star user_id= User.query.filter_by(role_id=2).first().id new_reust= TestResult(Test_user_id=user_id, test_num=result_toal, pass_num=result_pass, fail_num=result_fail, test_time=starttime, hour_time=hour, test_rep=(day +'.html'), test_log=(day +'.log')) email= EmailReport.query.filter_by(role_id=2, default_set=True).first() send_emails(sender=email.send_email, receivers=task.taskrepor_to, password=email.send_email_password, smtp=email.stmp_email, port=email.port, fujian1=file, fujian2=filepath, subject=u'%s自动用例执行测试报告'%day, url='%stest_rep'%(request.url_root)) db.session.add(new_reust) db.session.commit()
mock服务的一个请求方式的代码
1classMakemockserverView(MethodView):#做一个mock服务2defget(self,path):#get请求方法3huoqupath=Mockserver.query.filter_by(path=path,status=True).first()4heders=request.headers5method=request.method6ifnothuoqupath:7abort(404)8ifmethod.lower() !=huoqupath.methods:9returnjsonify({'code':'-1','message':'请求方式错误!','data':''})10ifhuoqupath.is_headers==True:11ifcomp_dict(heders,huoqupath.headers) ==True:12ifhuoqupath.ischeck==True:13paerm =request.values.to_di
TAG:
标题搜索
日历
|
|||||||||
日 | 一 | 二 | 三 | 四 | 五 | 六 | |||
1 | 2 | 3 | 4 | ||||||
5 | 6 | 7 | 8 | 9 | 10 | 11 | |||
12 | 13 | 14 | 15 | 16 | 17 | 18 | |||
19 | 20 | 21 | 22 | 23 | 24 | 25 | |||
26 | 27 | 28 | 29 | 30 | 31 |
我的存档
数据统计
- 访问量: 13234
- 日志数: 13
- 建立时间: 2016-04-14
- 更新时间: 2019-11-07