关闭

车载测试之python调用CANoe

发表于:2024-8-21 09:21

字体: | 上一篇 | 下一篇 | 我要投稿

 作者:雪语.    来源:CSDN

  Canoe主要用来做车载测试的,想了解Canoe的可以自行查找,这篇主要是用于在我们自动化测试中,我们使用python调用canoe的各种功能,如设置系统环境变量,启动测量等等
  首先我们要理解python调用canoe底层是因为Vector公司开放了一些com口,python主要是调用这些com口来实现的,具体有哪些可以打开canoe软件点击右上角问号帮助文档住查看
  代码如下
  import os
  import time
  import msvcrt
  from win32com.client import *
  from win32com.client.connect import *
  def DoEvents():
      pythoncom.PumpWaitingMessages()
      time.sleep(.1)
  def DoEventsUntil(cond):
      while not cond():
          DoEvents()
  # Vector Canoe Class
  class CANoe:
      Started = False
      Stopped = False
      ConfigPath = ""
   
      def __init__(self, visible=True):
          """
          产生一个canoe的com对象,启动了canoe软件,但是未加载任何工程
          :param visible: 其实可以设置canoe对于cfg文件是否保存
          """
          self.application = None
          self.application = DispatchEx("CANoe.Application")
          # self.application.Versible = visible
          self.ver = self.application.Version
          print('Loaded CANoe version ',
                self.ver.major, '.',
                self.ver.minor, '.',
                self.ver.Build, '...')
          # 这里是可以获取到canoe工程是否在启动,这个数值是True或者False
          self.Measurement = self.application.Measurement.Running
   
      def open_cfg(self, cfg_path_file):
          """
          加载cfg工程文件
          :param cfg_path_file:cfg文件的路径,这是是canoe加载cfg文件成功
          :return:
          """
          cfg_path_file = os.path.abspath(cfg_path_file)
          self.ConfigPath = cfg_path_file
          if (self.application != None):
              if os.path.isfile(cfg_path_file) and (os.path.splitext(cfg_path_file)[1] == ".cfg"):
                  self.application.Open(cfg_path_file)
                  print("opening..." + cfg_path_file)
              else:
                  raise RuntimeError("Can't find CANoe cfg file")
          else:
              raise RuntimeError("CANoe Application is missing,unable to open simulation")
   
      def close_cfg(self):
          """
          退出canoe
          :return:
          """
          if (self.application != None):
              print("close cfg ...")
              # self.stop_Measurement()
              self.application.Quit()
              self.application = None
   
      def start_measurement(self):
          """
          启动测量
          :return:
          """
          retry = 0
          retry_counter = 5
          # try to establish measurement within 5s timeout
          while not self.application.Measurement.Running and (retry < retry_counter):
              self.application.Measurement.Start()
              time.sleep(1)
              retry += 1
          if (retry == retry_counter):
              raise RuntimeWarning("CANoe start measuremet failed, Please Check Connection!")
   
      def stop_measurement(self):
          """
          停止测量
          :return:
          """
          if self.application.Measurement.Running:
              self.application.Measurement.Stop()
          else:
              pass
   
      def get_sigval(self, channel_num, msg_name, sig_name, bus_type="CAN"):
          """
          获取信号的值
          :param channel_num:
          :param msg_name:
          :param sig_name:
          :param bus_type:
          :return:
          """
          if (self.application != None):
              result = self.application.GetBus(bus_type).GetSignal(channel_num, msg_name, sig_name)
              return result.Value
          else:
              raise RuntimeError("CANoe is not open,unable to GetVariable")
   
      def set_sigval(self, channel_num, msg_name, sig_name, bus_type, setValue):
          """
          设置信号的值
          :param channel_num:
          :param msg_name:
          :param sig_name:
          :param bus_type:
          :param setValue:
          :return:
          """
          if (self.application != None):
              result = self.application.GetBus(bus_type).GetSignal(channel_num, msg_name, sig_name)
              result.Value = setValue
          else:
              raise RuntimeError("CANoe is not open,unable to GetVariable")
   
      def get_EnvVar(self, var):
          if (self.application != None):
              result = self.application.Environment.GetVariable(var)
              return result.Value
          else:
              raise RuntimeError("CANoe is not open,unable to GetVariable")
   
      def set_EnvVar(self, var, value):
          result = None
          if (self.application != None):
              # set the environment varible
              result = self.application.Environment.GetVariable(var)
              result.Value = value
              checker = self.get_EnvVar(var)
              # check the environment varible is set properly?
              while (checker != value):
                  checker = self.get_EnvVar(var)
          else:
              raise RuntimeError("CANoe is not open,unable to SetVariable")
   
      def get_system_variable_value(self, sys_var_name):
          """获取系统环境变量的值
          参数:
              sys_var_name (str): "sys_var_demo::speed"
          Returns:
              返回该系统变量的值
          """
          namespace = '::'.join(sys_var_name.split('::')[:-1])
          variable_name = sys_var_name.split('::')[-1]
          return_value = None
          try:
              namespace_com_object = self.application.System.Namespaces(namespace)
              variable_com_object = namespace_com_object.Variables(variable_name)
              return_value = variable_com_object.Value
          except Exception as e:
              raise RuntimeError("CANoe is not open,unable to")
          return return_value
   
      def set_system_variable_value(self, ns_name, sysvar_name, var):
          """
          设置系统环境变量的值,这里对该值进行了判断,如果类型不对设置会直接报错
          :param ns_name:
          :param sysvar_name:
          :param var:
          :return:
          """
          namespace = '::'.join(sys_var_name.split('::')[:-1])
          variable_name = sys_var_name.split('::')[-1]
          try:
              namespace_com_object = self.application.System.Namespaces(namespace)
              variable_com_object = namespace_com_object.Variables(variable_name)
              if isinstance(variable_com_object.Value, int):
                  variable_com_object.Value = int(value)
              elif isinstance(variable_com_object.Value, float):
                  variable_com_object.Value = float(value)
              else:
                  variable_com_object.Value = value
              self.log.info(f'system variable({sys_var_name}) value set to -> {value}.')
          except Exception as e:
              self.log.info(f'failed to set system variable({sys_var_name}) value. {e}')
   
      def load_test_setup(self, testsetup):
          """
          这里方法是通过一个tse文件来加载一个testmode,如果要在canoe中找到的话,可以打开test界面找到
          我们的测试项目可以直接打开路径找到tse文件
          :param testsetup:
          :return:
          """
          self.TestSetup = self.App.Configuration.TestSetup
          path = os.path.join(self.ConfigPath, testsetup)
          print("传入的tse路径:", path)
          # 如果目标 tse 已存在,直接读取,否则添加它,如果已经存在,直接add的话会报错
          tse_count = self.TestSetup.TestEnvironments.Count
          print("add前tse数量:", tse_count)
          _existTse = False
          for _index_tse in range(1, tse_count + 1):
              """
              这里是为了保证我们传入tse如果已经被canoe加载,那么在Add path的时候会报错,所以一定要这样判断
              """
              if self.TestSetup.TestEnvironments.Item(_index_tse).FullName == path:
                  testenv = self.TestSetup.TestEnvironments.Item(_index_tse)
                  _existTse = True
                  break
          if _existTse == False:
              testenv = self.TestSetup.TestEnvironments.Add(path)
   
          print("add后tse数量:", self.TestSetup.TestEnvironments.Count)
   
          testenv = CastTo(testenv, "ITestEnvironment2")
          # TestModules property to access the test modules
          self.TestModules = []
          self.TraverseTestItem(testenv, lambda tm: self.TestModules.append(CanoeTestModule(tm)))
   
      def TraverseTestItem(self, parent, testf):
          for test in parent.TestModules:
              testf(test)
          for folder in parent.Folders:
              found = self.TraverseTestItem(folder, testf)
   
      def run_all_test_models(self):
          """ 启动所有测试模块,直到所有测试停止再结束这个函数"""
          # start all test modules
          for tm in self.TestModules:
              tm.Start()
          # wait for test modules to stop
          while not all([not tm.Enabled or tm.IsDone() for tm in self.TestModules]):
              DoEvents()
   
      def run_one_test_model(self, tese_model_name):
          """ 启动某个测试模块,直到所有测试停止再结束这个函数"""
          # 启动指定名称的测试模块
          for tm in self.TestModules:
              if tm.Name == tese_model_name:
                  tm.Start()
          # 等待指定名称的测试模块完成或禁用
          for tm in self.TestModules:
              if tm.Name == tese_model_name:
                  while not all([not tm.Enabled or tm.IsDone()]):
                      DoEvents()
              else:
                  continue
   
   
  class CanoeTestModule:
      """Wrapper class for CANoe TestModule object"""
   
      def __init__(self, tm):
          self.tm = tm
          self.Events = DispatchWithEvents(tm, CanoeTestEvents)
          self.Name = tm.Name
          self.IsDone = lambda: self.Events.stopped
          self.Enabled = tm.Enabled
   
      def Start(self):
          if self.tm.Enabled:
              self.tm.Start()
              self.Events.WaitForStart()
   
   
  class CanoeTestEvents:
      """Utility class to handle the test events"""
   
      def __init__(self):
          self.started = False
          self.stopped = False
          self.WaitForStart = lambda: DoEventsUntil(lambda: self.started)
          self.WaitForStop = lambda: DoEventsUntil(lambda: self.stopped)
   
      def OnStart(self):
          self.started = True
          self.stopped = False
          print("<", self.Name, " started >")
   
      def OnStop(self, reason):
          self.started = False
          self.stopped = True
          print("<", self.Name, " stopped >")
   
   
  class CanoeMeasurementEvents(object):
      """Handler for CANoe measurement events"""
   
      def OnStart(self):
          CanoeSync.Started = True
          CanoeSync.Stopped = False
          print("< measurement started >")
   
      def OnStop(self):
          CanoeSync.Started = False
          CanoeSync.Stopped = True
          print("< measurement stopped >")
  在这段代码中实现了canoe的一些基本功能,我们我们可以直接使用,但是注意如果是在子线程中调用canoe需要自己实现com口的初始化
  import pythoncom
   
  pythoncom.CoInitialize()  # 初始化COM口
  canoe_obj = None
  try:
      canoe_obj = CANoe()
      canoe_obj.open_cfg('path')
      
      # 在finally块中添加条件检查,确保对象存在再调用方法
      if canoe_obj:
          canoe_obj.stop_measurement()
          canoe_obj.close_cfg()
  finally:
      pythoncom.CoUninitialize()  # 清理COM口
   这样我们可以在子线程中使用canoe的调用,是否是会报错的。
  关于canoe的调用还有一个第三方的包:py_canoe
  在实际中这个包会出现一个问题,就是重复多次的关闭canoe会报错,或者重复多次加载不同的cfg文件也会报错
  角色认证
  如果你涉及到canoe的角色认证,其实可以通过设置启动系统环境变量来实现,因为他的角色认证都是跟系统环境变量相关,如local认证,你点击灯会变绿,都是都是对应的系统环境变量的值在发生改变,设置会相同的就可以实现认证。
  本文内容不用于商业目的,如涉及知识产权问题,请权利人联系51Testing小编(021-64471599-8017),我们将立即处理
《2024软件测试行业从业人员调查问卷》,您的见解,行业的声音!

关注51Testing

联系我们

快捷面板 站点地图 联系我们 广告服务 关于我们 站长统计 发展历程

法律顾问:上海兰迪律师事务所 项棋律师
版权所有 上海博为峰软件技术股份有限公司 Copyright©51testing.com 2003-2024
投诉及意见反馈:webmaster@51testing.com; 业务联系:service@51testing.com 021-64471599-8017

沪ICP备05003035号

沪公网安备 31010102002173号