用Python同时启动多个Appium,并让多个手机同时执行脚本

发表于:2021-8-17 09:36

字体: | 上一篇 | 下一篇 | 我要投稿

 作者:我去热饭    来源:掘金

#
Appium
  首先你要连接好多个手机,设置好已经连接好的手机的ip列表ipp = ['192xxx','192xxx'],杀掉之前所有appium进程subprocess.Popen('killall node',shell = True)。
  声明一个方法def start(ip):调用时需要把ipp传入,在方法中写for循环,循环内循环内用传入的ip来启动appium服务,注意要用不同的端口号启动,相差2以上。在循环中调用webdriver.Remote方法创建driver对象。在循环中创建脚本类的实例时,请在循环中创建,并且传入driver。
  声明多线程列表
  在循环中用多线程创建该实例的方法的对象,并加入多线程列表。在main函数中,先调用start方法,填充好多线程的列表。然后再用for循环启动多线程列表。
  下面上start方法代码:
  def start(ip):
  for i in range(len(ip)):
  subprocess.Popen(‘/Applications/Appium.app/Contents/Resources/node/bin/node /Applications/Appium.app/Contents/Resources/node_modules/appium/build/lib/main.js –address “127.0.0.1” -p “’+str(4723+2*i)+’” –command-timeout “100” –automation-name “Appium” -U “’+ip[i]+’:’+str(5555+i)+’” >/tmp/1.txt’,shell=True)
  time.sleep(3.5)
  wzj = webdriver.Remote(‘http://localhost:’+str(4723+2*i)+’/wd/hub’, desired_caps)
  dingwei = DingWei(wzj, name)
  t = threading.Thread(target=dingwei.begin)
  tt.append(t)
  上面的DingWei 是我的脚本类,其中的begin是类中的启动脚本的方法。下面上主函数的代码:
  if name == “main“:
  start(ipp)
  time.sleep(2)
  for i in tt:
  i.start()
  下面是全套脚本:拿去用吧,自己改改。
  # -*- coding:utf-8 -*-
  from __future__ import division
  import time
  import datetime,subprocess
  import unittest
  import threading
  from appium import webdriver
  import os
  import xlrd,xlwt,xlutils
  from xlutils.copy import copy
  import wx
  from HTMLTestRunner import HTMLTestRunner
  from appium.webdriver.common.touch_action import TouchAction
  from appium.webdriver.common.touch_action import TouchAction
  desired_caps = {}
  desired_caps['platformName'] = 'Android'
  desired_caps['platformVersion'] = '5.1.1'
  desired_caps['deviceName'] = 'hahaha'
  desired_caps['appPackage'] = 'com.........blehunter.debug'
  desired_caps['appActivity'] = 'com.........blehunter.ui.WelcomeActivity'
  # desired_caps["unicodeKeyboard"] = "True"
  # desired_caps["resetKeyboard"] = "True"
  subprocess.Popen('killall node',shell=True)
  ipp = ['192.168.0.171','192.168.0.175']
  tt = []   #多线程对象
  def server(ip):
      for i in range(len(ip)):
          name = "./new/"+str(ipp[i])+"*****"+time.ctime()+".xls"
          SZ = xlrd.open_workbook('./new/0.xls')
          ww = copy(SZ)
          ww.save(name)
          subprocess.Popen('/Applications/Appium.app/Contents/Resources/node/bin/node /Applications/Appium.app/Contents/Resources/node_modules/appium/build/lib/main.js --address "127.0.0.1" -p "'+str(4723+2*i)+'" --command-timeout "100"  --automation-name "Appium" -U "'+ip[i]+':'+str(5555+i)+'" >/tmp/1.txt',shell=True)
          time.sleep(3.5)
          wzj = webdriver.Remote('http://localhost:'+str(4723+2*i)+'/wd/hub', desired_caps)
          dingwei = DingWei(wzj, name)
          t = threading.Thread(target=dingwei.begin)
          tt.append(t)
  class DingWei():
      def __init__(self,www,fname):
          self.wzj = www
          self.num = 0
          self.fname = fname
          self.SZ = xlrd.open_workbook(self.fname)
          self.sz = self.SZ.sheet_by_name("sheet1")
          self.ww = copy(self.SZ)
          self.nrows = self.sz.nrows
      def id(self,s):
          return self.wzj.find_element_by_id(s)
      ################
      def first_start(self):
          time.sleep(4)
          for i in range(7):
              try:
                  self.wzj.swipe(1000, 1500, 180, 1500)
              except:
                  self.wzj.swipe(400,706,90,706)
              time.sleep(0.2)
          time.sleep(3)
          try:
              self.wzj.find_element_by_id("com.........blehunter.debug:id/bt_guide_login").click()
          except:
              pass
          time.sleep(4)
          self.id('com.........blehunter.debug:id/et_phone').send_keys('18810437161')
          self.id('com.........blehunter.debug:id/et_phone_password').send_keys('qwerty')
          self.id('com.........blehunter.debug:id/btn_login').click()
          time.sleep(5)
      def chushihua(self):
          while 1:
              if self.m.is_enabled():
                  try:
                      self.id('com.........blehunter.debug:id/tv_location_failed')
                  except:
                      pass
                  break
              else:
                  pass
      def panduan(self,no):
          while 1:
              if self.m.is_enabled():
                  try :
                      self.id('com.........blehunter.debug:id/tv_location_failed')
                      self.ww.get_sheet(0).write(no+1, 1, "failed")
                      os.remove(self.fname)
                      self.ww.save(self.fname)
                  except :
                      self.ww.get_sheet(0).write(no+1, 1, "success")
                      os.remove(self.fname)
                      self.ww.save(self.fname)
                      self.num += 1
                  break
              else:
                  pass
      def begin(self):
          #self.first_start()
          self.wzj.implicitly_wait(10)
          b = self.wzj.find_element_by_xpath('//android.support.v7.widget.RecyclerView/android.widget.FrameLayout[2]/android.widget.ImageView')
          b.click()
          print self.wzj,u"初始次... "
          self.m = self.id('com.........blehunter.debug:id/iv_refresh_friend_location')
          self.chushihua()
          j = 0
          time0 = datetime.datetime.now()
          #总次数
          n = input(u'请输入测试次数,必须为整数!')
          for i in range(n):
              j += 1
              time.sleep(1)
              time1 = datetime.datetime.now()
              self.id('com.........blehunter.debug:id/iv_refresh_friend_location').click()
              self.panduan(i)
              time2 = datetime.datetime.now()
              self.ww.get_sheet(0).write(i+1, 2, "j")
              self.ww.get_sheet(0).write(i+1, 3, str(self.num))
              self.ww.get_sheet(0).write(i+1, 4, str(j-self.num))
              self.ww.get_sheet(0).write(i+1, 5, str((time2-time1).seconds))
              self.ww.get_sheet(0).write(i+1, 6, str(int((time2-time0).seconds)-j))
              os.remove(self.fname)
              self.ww.save(self.fname)
          self.ww.get_sheet(0).write(n+2, 0,str(((self.num)/n)*100)[:6]+'%' )
          os.remove(self.fname)
          self.ww.save(self.fname)
  if __name__ == "__main__":
      server(ipp)
      time.sleep(2)
      for i in tt:
          i.start()

      本文内容不用于商业目的,如涉及知识产权问题,请权利人联系51Testing小编(021-64471599-8017),我们将立即处理
《2023软件测试行业现状调查报告》独家发布~

关注51Testing

联系我们

快捷面板 站点地图 联系我们 广告服务 关于我们 站长统计 发展历程

法律顾问:上海兰迪律师事务所 项棋律师
版权所有 上海博为峰软件技术股份有限公司 Copyright©51testing.com 2003-2024
投诉及意见反馈:webmaster@51testing.com; 业务联系:service@51testing.com 021-64471599-8017

沪ICP备05003035号

沪公网安备 31010102002173号