多线程运行JMeter脚本

发表于:2018-6-12 11:18  作者:zww1   来源:博客园

字体: | 上一篇 | 下一篇 |我要投稿 | 推荐标签: 性能测试工具 软件测试工具 Jmeter

  解决问题:之前对数据库Jmeter要做一个压力测试的时候,由于有多个Jmeter脚本且希望所有脚本都是在同一时间运行。
  解决方法:所以用python写了一个多线程运行的小脚本来执行多个jmk脚本。
  一、框架结构以及各种方法的说明:
  二、各文件的方法:
  (1)TxtOperating.py,文件操作:
  (2)CpuMonitor.py,监控系统cpu、内存以及磁盘情况
  (3)StartJmeter.py:
  三、具体代码:
  (1)CpuMonitor.py
# -*- coding: utf-8 -*-
'''
Created on 2018年6月6日
@author: zww
监控系统cpu、内存以及磁盘情况,提供屏幕输出以及写入文本
'''
import time,psutil
from handler import TxtOperating as TxtOp
cpu = {'user':0,'system':0,'idle':0,'percent':0}
memory = {'total':0,'available':0,'percent':0,'used':0,'free':0}
#将磁盘单位转换成G
def change2G(disk_memory):
return str(round((disk_memory/1024/1024/1024)))
def getCpuInfo():
cpu_info = psutil.cpu_times()
cpu['user'] = cpu_info.user
cpu['system'] = cpu_info.system
cpu['idle'] = cpu_info.idle
cpu['percent'] = psutil.cpu_percent(interval=1)
def getMemoryInfo():
memory_info = psutil.virtual_memory()
memory['total'] = change2G(memory_info.total)
memory['available'] = change2G(memory_info.available)
memory['percent'] = memory_info.percent
memory['used'] = change2G(memory_info.used)
memory['free'] = change2G(memory_info.percent)
def getDiskInfo():
#磁盘名称
disk_id = []
#将每个磁盘的total used free percent 分别存入到相应的list
disk_total = []
disk_used = []
disk_free = []
disk_percent = []
for id in psutil.disk_partitions():
if 'cdrom' in id.opts or id.fstype == '':
continue
disk_name = id.device.split(':')
s = disk_name[0]
disk_id.append(s)
disk_info = psutil.disk_usage(id.device)
disk_total.append(change2G(disk_info.total))
disk_used.append(change2G(disk_info.used))
disk_free.append(change2G(disk_info.free))
disk_percent.append(disk_info.percent)
return disk_id,disk_total,disk_used,disk_free,disk_percent
def formating(disk_id,disk_total,disk_used,disk_free,disk_percent):
all_str = ''
length_of_disk = len(disk_id)
for i in range(length_of_disk):
str = '磁盘 %s:总内存量 %sG,已用 %sG,剩余 %sG,使用率为%s%%'%(disk_id[i],disk_total[i],disk_used[i],disk_free[i],disk_percent[i])
# print(str) #需要输出在命令台则取消注释
all_str = ''.join([all_str,str,'\n'])
return all_str
def run():
while True:
runtime = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime())
getCpuInfo()
getMemoryInfo()
disk_id,disk_total,disk_used,disk_free,disk_percent = getDiskInfo()
cpu_str = ''.join(['CPU使用率:',str(cpu['percent'])])
memory_str = '內存 :总内存量 %sG,已用 %sG,剩余 %sG,使用率为%s%%'%(memory['total'],memory['used'],memory['available'],memory['percent'])
disk_str = formating(disk_id,disk_total,disk_free,disk_used,disk_percent)
str_all = ''.join([runtime,'\n',cpu_str,'\n',memory_str,'\n',disk_str])
print(str_all)
#构造监控文本存放的路径和命名
cur_dir = os.path.abspath(os.path.dirname(__file__))
run_time = time.strftime('%Y%m%d_%H%M%S',time.localtime())
result_path = ''.join([cur_dir,'\\MonitorResult\\',run_time,'sysMonitor.txt'])
TxtOp.write(str_all,result_path)
time.sleep(3)
  (2)StartJmeter.py:
# -*- coding: utf-8 -*-
'''
Created on 2018年6月6日
@author: zww
多线程启动jmeter脚本
'''
import os,time
import threading
#获取当前目录下的所有文件
def getJmeterFile(filepath='D:\\JmeterFiles'):
for root,dirs,files in os.walk(filepath):
return root,files
#过滤文件,只筛选‘.jmx’的文件
def filterFile(root,files):
after_files = []
for file in files:
if file.endswith('.jmx'):
after_files.append(''.join([root,'\\',file]))
return after_files
#运行jmeter脚本
def startJmeter(casename):
runtime = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime())
comd = ''.join(['jmeter -n -t',casename,' -l',runtime,casename,'.jtl'])
os.system(comd)
#多线程运行
def run():
root,files = getJmeterFile()
excutefiles = filterFile(root,files)
for file in excutefiles:
t = threading.Thread(target=startJmeter,args=(file,))
#t.setDaemon() #后台运行
t.start()
  (3)TxtOperating.py:
# -*- coding: utf-8 -*-
'''
Created on 2018年6月6日
@author: zww
读取或者写入文本文件
'''
def write(filestr,filepath='D:\\AutoPressMonitor\\sysMonitor.txt'):
str = ''.join([filestr,'\n'])
with open(filepath,'a') as fp:
fp.write(str)
fp.close()
  (4)bin.py:
# -*- coding: utf-8 -*-
'''
Created on 2018年6月6日
@author: zww
'''
from interface import CpuMonitor,StartJmeter
if __name__ == '__main__':
# StartJmeter.run()
CpuMonitor.run()
上文内容不用于商业目的,如涉及知识产权问题,请权利人联系博为峰小编(021-64471599-8017),我们将立即处理。

Python+Selenium大型电商项目(京东商城)实战直播,优惠名额抢占中>>

评 论

论坛新帖

顶部 底部


建议使用IE 6.0以上浏览器,800×600以上分辨率,法律顾问:上海瀛东律师事务所 张楠律师
版权所有 上海博为峰软件技术股份有限公司 Copyright©51testing.com 2003-2018, 沪ICP备05003035号
投诉及意见反馈:webmaster@51testing.com; 业务联系:service@51testing.com 021-64471599-8017

沪公网安备 31010102002173号

51Testing官方微信

51Testing官方微博

扫一扫 测试知识全知道