python实现多线程压测post接口服务
上一篇 /
下一篇 2017-04-06 22:50:36
/ 个人分类:python
2016-09-21 18:20 191人阅读 收藏 举报 版权声明:本文为博主原创文章,未经博主允许不得转载。
- import urllib
- import urllib2
- import random
- import sys
- import threading,time
- from time import sleep, ctime
- import json
- import time
- host_url='http://127.0.0.1:8070'
- def now() :
- return str( time.strftime( '%Y-%m-%d %H:%M:%S' , time.localtime() ) )
-
- def post1():
- print 'start loop', 'at:', now()
- start = time.clock()
- uid=random.randint(0, 2000000)
-
- url=host_url+'/bigdata/crm/getGroupResCount'
- values ={"query_where": [{"item": "uid","query": [{"op": "<","value": uid}]}]}
-
- jdata = json.dumps(values)
-
- req = urllib2.Request(url, jdata)
- req.add_header('Content-Type', 'application/json')
- response = urllib2.urlopen(req)
- end = time.clock()
- print "run: %f s" % (end - start)
- print response.read()
-
- def post2():
- print 'start loop', 'at:', now()
- start = time.clock()
- uid=random.randint(0, 2000000)
-
- url=host_url+'/bigdata/crm/generateDataFile'
- values ={"groupType": "dynamic","groupID": "123","query_where": [{"item": "uid","query": [{"op": "<","value": uid}]}]}
-
- jdata = json.dumps(values)
-
- req = urllib2.Request(url, jdata)
- req.add_header('Content-Type', 'application/json')
- response = urllib2.urlopen(req)
- end = time.clock()
- print "run: %f s" % (end - start)
- print response.read()
-
- def post():
- post1()
-
-
-
- def main():
- loop=int(sys.argv[1])
- ths=int(sys.argv[2])
-
- for i in xrange(loop):
- threadpool=[]
- for i in xrange(ths):
- th = threading.Thread(target= post)
- threadpool.append(th)
- for th in threadpool:
- th.start()
- for th in threadpool :
- threading.Thread.join( th )
-
-
-
- post()
- if __name__ == '__main__':
- main()
收藏
举报
TAG:
Python
多线程
接口
python