使用JMeter场景压测RocketMQ最佳实践

发表于:2024-1-08 09:17

字体: | 上一篇 | 下一篇 | 我要投稿

 作者:森元    来源:稀土掘金

  需求背景
  新业务上线前,我们通常需要对系统的不同中间件进行压测,找到当前配置下中间件承受流量的上限,从而确定上游链路的限流规则,保护系统不因突发流量而崩溃。阿里云 PTS 的 JMeter 压测可以支持用户上传自定义的 JMeter 脚本,按照自定义的逻辑,借助 PTS 强大的分布式压测能力,对系统的不同中间件进行压测。下面,将以 JMeter5.5 和 RocketMQ5.0 系列为例,详细介绍如何使用 PTS 的 JMeter 场景压测 RocketMQ。
  前置条件
  1. 已在本地安装 JMeter。
  2. 已在阿里云 ECS 上部署 RocketMQ(本文选择的是一台 8C32G 规格的 ECS)。
  3. 已在阿里云上开通 PTS 服务。
  压测过程
  JMeter 提供了扩展性极强的 JavaSampler,我们可以通过继承 AbstractJavaSamplerClient 类来自定义在 JavaSampler 中执行的逻辑,从而实现对 RocketMQ 进行压测。
  步骤一:创建 Maven 项目,并引入依赖
  新建 Maven 工程,并在 pom 文件中引入下面的依赖:
  <dependency>
    <groupId>org.apache.jmeter</groupId>
    <artifactId>ApacheJMeter_java</artifactId>
    <version>5.5</version>
    <scope>provided</scope>
  </dependency>
  <dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.9.5</version>
  </dependency>
  ApacheJMeter_java 是 JMeter JavaSampler 的依赖,rocketmq-client 是 RocketMQ 的客户端依赖(此处用 4.x 版本是因为 4.x 版本的客户端可以兼容 5.x 版本的服务端实例,但是 5.x 版本的客户端不能兼容 4.x 版本的服务端实例,可根据自己需求调整)。其中,要注意的是 ApacheJMeter_java 依赖的 scope 定义为? provided,JMeter 的 lib/ext 目录下已有该 JAR 包,因此不必将该依赖一起打包。
  在 pom 文件中引入 maven-assembly-plugin 插件,此处使用 “jar-with-dependencies” 打包方式,将项目所需依赖和项目代码打包到同一个 JAR 包,后续可以只上传该 JAR 包到 PTS 的 JMeter 环境中,不用上传多个依赖 JAR 包:
  <build>
    <finalName>jmeter-rocketmq4</finalName>
    <plugins>
      <plugin>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>3.4.2</version>
        <configuration>
          <!-- 打包方式 -->
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
  步骤二:新建 AbstractJavaSamplerClient 的子类,并重写相关方法
  AbstractJavaSamplerClient 类继承了 JavaSamplerClient 接口,该接口包含 setupTest、runTest、teardownTest 和 getDefaultParameters 四个方法:
  · setupTest
  JMeter 将为测试中的每个线程创建一个 JavaSamplerClient 实现实例,测试开始时,将在每个线程的 JavaSamplerClient 实例上调用 setupTest 来初始化客户端,本例中即初始化 RocketMQ 的 producer。
  · runTest
  每个线程每次迭代会调用一次 runTest 方法,本例中,需要在 runTest 方法里面定义消息发送的方法和采样结果的设置逻辑。
  · teardownTest
  迭代完设置的次数或时间后,此方法将会被执行,本例中,需要在此方法关闭 producer。
  · getDefaultParameters
  此方法定义了参数列表,这些参数通过会 JavaSamplerContext 传递给上述方法方法,在此方法内定义的参数,可以在 JMeter JavaRequest Sampler 的 GUI 界面设置值,本例中,需要定义 RocketMQ 的 broker 地址、topic 名称、消息 key、消息内容等参数。
  新建子类参考如下:
  import java.nio.charset.StandardCharsets;
  import org.apache.jmeter.config.Arguments;
  import org.apache.jmeter.protocol.java.sampler.AbstractJavaSamplerClient;
  import org.apache.jmeter.protocol.java.sampler.JavaSamplerContext;
  import org.apache.jmeter.samplers.SampleResult;
  import org.apache.rocketmq.client.exception.MQBrokerException;
  import org.apache.rocketmq.client.exception.MQClientException;
  import org.apache.rocketmq.client.producer.DefaultMQProducer;
  import org.apache.rocketmq.client.producer.SendResult;
  import org.apache.rocketmq.common.message.Message;
  import org.apache.rocketmq.remoting.exception.RemotingException;
  public class JavaSamplerForRocketMQ extends AbstractJavaSamplerClient {
      private DefaultMQProducer producer;
      private static final String NAME_SRV_ADDRESS = "nameSrvAddress";
      private static final String TOPIC = "topic";
      private static final String PRODUCER_GROUP = "producer group";
      private static final String MSG_BODY = "messageBody";
      private static final String MSG_KEY = "messageKey";
      private static final String MSG_TAG = "messageTag";
      private static final String ERROR_CODE = "500";
      @Override
      public void setupTest(JavaSamplerContext javaSamplerContext) {
          try {
              // 初始化producer
              producer = new DefaultMQProducer(javaSamplerContext.getParameter(PRODUCER_GROUP));
              producer.setNamesrvAddr(javaSamplerContext.getParameter(NAME_SRV_ADDRESS));
              producer.start();
          } catch (MQClientException e) {
              throw new RuntimeException(e);
          }
      }
      @Override
      public SampleResult runTest(JavaSamplerContext javaSamplerContext) {
          SampleResult sampleResult = new SampleResult();
          sampleResult.setSampleLabel("rocketmq-producer");
          // 请求开始
          sampleResult.sampleStart();
          // 普通消息发送
          Message message = new Message(
              javaSamplerContext.getParameter(TOPIC),
              javaSamplerContext.getParameter(MSG_TAG),
              javaSamplerContext.getParameter(MSG_BODY).getBytes()
          );
          try {
              // 发送消息,需要关注发送结果,并捕获失败等异常。
              SendResult sendResult = producer.send(message);
              // 设置发送请求的字节数
              sampleResult.setSentBytes(message.toString().getBytes(StandardCharsets.UTF_8).length);
              sampleResult.setDataType(SampleResult.TEXT);
              // 设置请求内容
              sampleResult.setSamplerData(message.toString());
              // 设置响应内容
              sampleResult.setResponseData(String.format("Msg Id:%s", sendResult.getMsgId()).getBytes());
              sampleResult.setSuccessful(true);
              sampleResult.setResponseCodeOK();
          } catch (MQBrokerException | InterruptedException | RemotingException | MQClientException e) {
              sampleResult.setSuccessful(false);
              sampleResult.setResponseCode(ERROR_CODE);
              sampleResult.setResponseData(String.format("Error Msg:%s", e).getBytes());
              return sampleResult;
          } finally {
              // 请求结束
              sampleResult.sampleEnd();
          }
          return sampleResult;
      }
      @Override
      public void teardownTest(JavaSamplerContext javaSamplerContext) {
          producer.shutdown();
      }
      @Override
      public Arguments getDefaultParameters() {
          Arguments arguments = new Arguments();
          arguments.addArgument(NAME_SRV_ADDRESS, "");
          arguments.addArgument(PRODUCER_GROUP, "");
          arguments.addArgument(TOPIC, "");
          arguments.addArgument(MSG_KEY, "");
          arguments.addArgument(MSG_TAG, "");
          arguments.addArgument(MSG_BODY, "");
          return arguments;
      }
  }
  步骤三:打包项目成 JAR 文件
  通过 mvn clean package 将项目打包,在 target 目录中可见 jmeter-rocketmq4.jar 和 jmeter-rocketmq4-jar-with-dependencies.jar 两个 JAR 包,其中 jmeter-rocketmq4-jar-with-dependencies.jar 包括了所需的依赖,在后续步骤中使用此 JAR 包。
  .
  ├── pom.xml
  ├── src
  │   ├── main
  │   │   ├── java
  │   │   │   └── JavaSamplerForRocketMQ4.java
  │   │   └── resources
  │   └── test
  │       └── java
  └── target
      ├── jmeter-rocketmq4-jar-with-dependencies.jar
      ├── jmeter-rocketmq4.jar
  步骤四:使用 JMeter GUI 进行脚本编写和调试
  将打包好的 JAR 包和依赖的 JAR 包复制到 JMETER_HOME/lib/ext 目录下,然后执行命令 JMETER_HOME/bin/jmeter 打开 JMeter GUI。
  新建线程组后添加 Java 请求取样器。
  在下拉框中选择步骤二中新增的类(不一定和图片中的完全一致,按照实际的类全限定名选择),并填写下方相关参数。
  为线程组添加“查看结果树”和“汇总报告”监听器,然后启动测试计划,在结果树和汇总报告中验证测试的结果是否符合预期。
  保存该测试计划为 JMX 文件。
  步骤五:在 PTS 创建 JMeter 场景进行压测
  在 PTS 控制台创建 JMeter 环境,将步骤三中打包的 JAR 包上传到该 JMeter 环境中(更多细节请参考 JMeter 环境管理的查看、修改及创建_性能测试-阿里云帮助中心 [ 1] ):
  a. 进入 PTS 控制台,选择“JMeter 环境”;
  b. 输入自定义的环境名;
  c. 点击上传文件,选择步骤三中打包的 JAR 包;
  d. 点击保存。
  在 PTS 控制台创建场景中选择“JMeter 压测”场景:
  编辑“场景配置”:
  a. 自定义场景名;
  b. 点击上传文件,选择步骤四中保存的 JMX 文件;
  c. 在“使用依赖环境?”下拉框中选择“是,使用依赖环境”;
  d. 在“选择依赖环境”下拉框选择刚刚创建的 JMeter 环境。
  施压配置:
  小建议:由于我们是想通过压测找到 RocektMQ 能承受的最大并发请求数,因此建议选择 RPS 模式,这样可以直接衡量 RocektMQ 的承压能力。同时,考虑到公网带宽限制,应该选择阿里云 VPC 内网压测。
  a. 选择压力来源为阿里云 VPC 内网,同时选择部署被压测 RocketMQ 的 ECS 所在区域;
  b. 设置 ECS 的 VPC、安全组和交换机,注意 VPC 和安全组一定要和 ECS 相同,安全组中要打开响应的端口(在 ECS 控制台设置);
  c. 设置压力模式为 RPS 模式;
  d. 设置起始 RPS、最大 RPS 和压测时长,本文设置起始 RPS 为 90000,最大 RPS 为 110000,持续 2 分钟。
  e. 指定循环一般设置为否,表示执行一次就结束,指定 IP 数会根据设置的 RPS 自动生成。
  其余设置请根据需求参考 JMeter 压测_性能测试-阿里云帮助中心 [ 2] 。
  保存配置并调试场景,确认和 RocketMQ 的连通,之后可以开始进行压测。
  步骤六:查看压测报告
  JMeter 的压测报告通用解读可以参考如何查看 JMeter 压测数据、采样日志及施压机性能_性能测试-阿里云帮助中心 [ 3] ,下一节将介绍如何使用 PTS 的压测报告来找到 RocketMQ 的承压能力。
  报告解读
  首先,查看整个压测的概览信息和指标趋势。如下图所示,报告第一栏展示了整个压测过程的请求成功率、平均 RT、平均 TPS 等指标,这些指标可以在官方文档中找到具体解释。同时,根据成功率的趋势图所示,从 18:54:05 开始,成功率逐渐波动下降,此时的 TPS 值为 9.55W,代表 18:54:05 计算的前 5 秒平均 TPS 约为 9.55W。
  其次,使用压测报告中的 Prometheus 监控数据对结果进一步分析。借助阿里云 ARMS 的 Prometheus 和 Grafana 产品,PTS 的压测报告可以提供包括吞吐量、成功率和响应时长的时序图,同时,支持用户使用 PromQL 语句对数据面板进行编辑操作,灵活查询所需的数据,在本文中,我们可以将成功率和吞吐量放在一个 panel,来进一步分析。
  a.首先点击“成功率(时序)”,然后点击“Edit”,可进入成功率大盘的编辑界面,复制成功率的查询 PromQL:
  sum(rate(pts_api_response_total{task_id="$task_id", code=~"200|302"}[5s]))/sum(rate(pts_api_response_total{task_id="$task_id"}[5s]))
  b.然后进入吞吐量大盘的编辑界面,使用成功率的 PromQL 替换虚拟用户数的 PromQL,并更改 Grafana 的相关配置(下图中红框),便可得到展示吞吐量和成功率的面板。
  该面板展示的数据统计精度为 1 秒,可得到更精确的数据,在 18:54:05 秒时,成功率开始下降,此时 TPS 为 96561.9。
  c. 为了更好的评估 RocketMQ 的性能,我们还可以统计出成功率保持 100% 的时间范围内的平均 TPS,首先找到成功率为 100% 的持续时间,下图中为 47 秒,然后将计算 TPS 的指标的时间范围改成 47s,这样每个点都代表前 47s 的平均 TPS,将鼠标移动到成功率为 100% 的最后一个时间,当前时间的 TPS 值即为成功率为 100% 时间范围内的平均 TPS,即 89357.5。
  最后,为了对比不同参数的设置对 RocketMQ 性能的影响,同时验证 PTS 在 RocketMQ 压测上的可用性,我们做了一个简单的对比实验,并通过 jstat 命令来观察不同参数对垃圾回收的影响。
  实验结果显示,对于当前 ECS 配置部署的 RocketMQ,适当调大堆内存可以有效提高 RocketMQ 的性能,当堆内存提高到 24g 时(此事 ECS 内存使用率达到 85.39%),性能没有显著提高;适当提高 sendMessageThreadPoolNums 的值可以提高 RocketMQ 的性能,当 sendMessageThreadPoolNums 超过 16 后,性能没有显著提高,甚至略有下降。用户可以根据实际情况,进行更详细的对比实验,来充分评估所部署的 RocketMQ 承压能力。
  结束语
  本文介绍了使用阿里云 PTS 的 JMeter 场景压测 RocketMQ 的详细步骤,对各环节逐一进行了说明,最后,通过对压测报告的自定义分析,展现了 PTS 强大的压测结果分析能力,借助 JMeter 和 PTS,用户可以对各类中间件进行灵活多维的分析,助力其构建起稳定健壮的系统。
  本文内容不用于商业目的,如涉及知识产权问题,请权利人联系51Testing小编(021-64471599-8017),我们将立即处理
《2023软件测试行业现状调查报告》独家发布~

关注51Testing

联系我们

快捷面板 站点地图 联系我们 广告服务 关于我们 站长统计 发展历程

法律顾问:上海兰迪律师事务所 项棋律师
版权所有 上海博为峰软件技术股份有限公司 Copyright©51testing.com 2003-2024
投诉及意见反馈:webmaster@51testing.com; 业务联系:service@51testing.com 021-64471599-8017

沪ICP备05003035号

沪公网安备 31010102002173号