通过python-can实现针对ECU内部负载测试工具

发表于:2024-4-12 09:28

字体: | 上一篇 | 下一篇 | 我要投稿

 作者:名字太俊不提也罢    来源:CSDN

  前言
  之前公司大佬希望我做个总线针对ECU内部负载测试工具,尝试通过python自动化生成CAPL脚本,发送周期发送极不稳定,咨询过后是CAPL本身的问题,遂搁置此问题,今天正好有时间,把关于这部分的实现一下,记录一下。(代码基于PCAN实现,测试过程中会有发送队列满的问题,后续有时间再处理)
  一、针对ECU内部代码的测试
  将ECU内部需要接受的相关报文,按照对应的周期进行发送,检测代码在这种负载情况下相关逻辑是否有问题。
  二、具体代码
  代码如下(示例):
  import random
  import threading
  import time
  import can
  from typing import Optional, List
  from can.bit_timing import BitTimingFd
  from queue import Queue
  class CanTool:
      def __init__(self):
          self.__bus = None
          self.__notifier = None
          self.__is_fd = False
          self.__send_queue = Queue()
          self.flag_dict = dict()
          self.queue_dict = dict()
      def __notifier_init(self):
          """
          功能:初始化报文消息得回调函数
          """
          logger = can.Logger("logfile.asc")
          listeners = [
              logger,  # 保存log日志,asc格式
          ]
          self.__notifier = can.Notifier(self.__bus, listeners)
      def bus_init(self, device: Optional[str], channel: Optional[str], bitrate: Optional[int] = 125000, is_fd=False):
          """
          功能:初始化总线设备
          参数1:设备名。str
          参数2:通道,str
          参数3:波特率,int
          """
          if is_fd:
              timingFD = BitTimingFd(
                  f_clock=80000000,  # 时钟频率,有MHz和Hz,注意区分
                  nom_brp=2,  # 仲裁场分频系数
                  nom_tseg1=63,  # 仲裁场TSeg1
                  nom_tseg2=16,  # 仲裁场TSeg2
                  nom_sjw=16,  # 仲裁场同步跳变宽度
                  data_brp=2,  # 数据场分频系数
                  data_tseg1=15,  # 数据场TSeg1
                  data_tseg2=4,  # 数据场TSeg2
                  data_sjw=4)  # 数据场分频系数
              self.__bus = can.interface.Bus(bustype='pcan', channel='PCAN_USBBUS1', fd=True, timing=timingFD)
              self.__notifier_init()
              self.__is_fd = True
          else:
              try:
                  self.__bus = can.interface.Bus(bustype=device, channel=channel, bitrate=bitrate)
                  self.__notifier_init()
                  return True
              except Exception as e:
                  print(e)
                  return False
      def bus_close(self):
          """
          功能:结束总线收发,回收资源
          """
          self.__notifier.stop()
          self.__bus.shutdown()
      def __send_msg(self, msg_id: Optional[str], msg_data: Optional[list[int]]):
          """
          功能:发送单独一帧数据
          参数1:需要发送得报文id,str
          参数2:需要发送得数据列表,list[int]
          """
          msg_id = int(msg_id)
          if msg_id < 2047:
              msg = can.Message(arbitration_id=msg_id, is_extended_id=False, data=msg_data, is_fd=self.__is_fd)
          else:
              msg = can.Message(arbitration_id=msg_id - 0x80000000, is_extended_id=True, data=msg_data,
                                is_fd=self.__is_fd)
          self.__bus.send(msg)
      def write_msg_thread(self, cycle_time, msg_list):
          """
          功能:开启一个发送报文线程,向发送队列中写入数据
          参数1:报文周期
          参数2:支持此报文周期的报文列表
          """
          write_thread = threading.Thread(target=self.write_msg, args=(cycle_time, msg_list))
          write_thread.start()
      def write_msg(self, cycle_time, msg_list):
          """
          功能:向报文发送队列写入数据
          参数1:报文发送周期
          参数2:支持此报文周期的报文列表
          """
          if cycle_time == '0':
              while self.flag_dict[cycle_time]:
                  for msg in msg_list:
                      msg_dict = dict()
                      msg_data = list()
                      for _ in range(int(msg.dlc)):
                          msg_data.append(random.randint(1, 255))
                      msg_dict['id'] = msg.can_id
                      msg_dict['data'] = msg_data
                      self.__send_queue.put(msg_dict)
                  time.sleep(random.uniform(0, 0.5))
          else:
              while self.flag_dict[cycle_time]:
                  for msg in msg_list:
                      msg_dict = dict()
                      msg_data = list()
                      for _ in range(int(msg.dlc)):
                          msg_data.append(random.randint(1, 255))
                      msg_dict['id'] = msg.can_id
                      msg_dict['data'] = msg_data
                      self.__send_queue.put(msg_dict)
                  time.sleep(int(cycle_time) / 1000)
      def load_test(self, time_dict):
          """
          功能:开启ECU负载测试功能
          参数1:记录对应报文周期的字典,key为报文周期,value为msg列表
          """
          self.thread_time_send()
          for cycle_time, msg_list in time_dict.items():
              self.flag_dict[cycle_time] = True
              self.write_msg_thread(cycle_time, msg_list)
      def thread_time_send(self):
          """
          功能:开启一个报文发送线程
          """
          cycle_time_thread = threading.Thread(target=self.cycle_time_send)
          cycle_time_thread.start()
      def cycle_time_send(self):
          """
          功能:周期从队列中获取报文数据进行发送
          """
          while True:
              msg = self.__send_queue.get()
              self.__send_msg(msg["id"], msg['data'])
              self.__bus.reset()
      def stop_load_test(self):
          """
          功能:停止ECU负载测试
          """
          for cycle_time,flag in self.flag_dict.items():
              self.flag_dict[cycle_time] = False
  本文内容不用于商业目的,如涉及知识产权问题,请权利人联系51Testing小编(021-64471599-8017),我们将立即处理
《2023软件测试行业现状调查报告》独家发布~

关注51Testing

联系我们

快捷面板 站点地图 联系我们 广告服务 关于我们 站长统计 发展历程

法律顾问:上海兰迪律师事务所 项棋律师
版权所有 上海博为峰软件技术股份有限公司 Copyright©51testing.com 2003-2024
投诉及意见反馈:webmaster@51testing.com; 业务联系:service@51testing.com 021-64471599-8017

沪ICP备05003035号

沪公网安备 31010102002173号