简单的性能自动化测试架构设计和实现(pylot)-python

发表于:2013-5-17 14:59

字体: | 上一篇 | 下一篇 | 我要投稿

 作者:waterxi(cnblogs)    来源:51Testing软件测试网采编

  4、代理管理部分:

  代理管理的主要任务是创建几个重要数据的队列,实例化每个agent并为其创建新的线程。代理管理会管理这个线程池,并在这些中重要的队列中获取到测试的结果;

  重要代码:

     def run(self):
         self.running = True
         self.agents_started = False
         try:
             os.makedirs(self.output_dir, 0755)
         except OSError:
             self.output_dir = self.output_dir + time.strftime('/results_%Y.%m.%d_%H.%M.%S', time.localtime())
             try:
                os.makedirs(self.output_dir, 0755)
             except OSError:
                 sys.stderr.write('ERROR: Can not create output directory\n')
                 sys.exit(1)        
 
         # start thread for reading and writing queued results
         self.results_writer = ResultWriter(self.results_queue, self.output_dir)
         self.results_writer.setDaemon(True)
         self.results_writer.start()       
 
         for i in range(self.num_agents):
             spacing = float(self.rampup) / float(self.num_agents)
             if i > 0:  # first agent starts right away
                 time.sleep(spacing)
             if self.running:  # in case stop() was called before all agents are started
                 agent = LoadAgent(i, self.interval, self.log_msgs, self.output_dir, self.runtime_stats, self.error_queue, self.msg_queue, self.results_queue)
                 agent.start()
                 self.agent_refs.append(agent)
                 agent_started_line = 'Started agent ' + str(i + 1)
                 if sys.platform.startswith('win'):
                     sys.stdout.write(chr(0x08) * len(agent_started_line))  # move cursor back so we update the same line again
                     sys.stdout.write(agent_started_line)
                 else:
                     esc = chr(27) # escape key
                     sys.stdout.write(esc + '[G' )
                     sys.stdout.write(esc + '[A' )
                     sys.stdout.write(agent_started_line + '\n')
         if sys.platform.startswith('win'):
             sys.stdout.write('\n')
         print '\nAll agents running...\n\n'
         self.agents_started = True 

     def stop(self):
         self.running = False
         for agent in self.agent_refs:
             agent.stop()        
 
         if WAITFOR_AGENT_FINISH:
             keep_running = True
             while keep_running:
                 keep_running = False
                 for agent in self.agent_refs:
                     if agent.isAlive():
                         keep_running = True
                         time.sleep(0.1)
 
         self.results_writer.stop()

54/5<12345>
《2023软件测试行业现状调查报告》独家发布~

精彩评论

  • zdlhappy
    2014-5-09 09:43:23

    请教一个问题,我装好了所有需要装的,但是result.html上还是没有图片,日志里也没报错,请大神指点!!

关注51Testing

联系我们

快捷面板 站点地图 联系我们 广告服务 关于我们 站长统计 发展历程

法律顾问:上海兰迪律师事务所 项棋律师
版权所有 上海博为峰软件技术股份有限公司 Copyright©51testing.com 2003-2024
投诉及意见反馈:webmaster@51testing.com; 业务联系:service@51testing.com 021-64471599-8017

沪ICP备05003035号

沪公网安备 31010102002173号