Java 应用通过 OpenTelemetry API 实现手动埋点(2)

上一篇 / 下一篇  2023-09-07 13:49:10

  由于前端 frontend 在请求后端接口的时候我们已经注入了 W3CTraceContext,所以我们只需要在 Java 应用中通过 propagation api 来获取到 span context,然后将其作为父级 span,这样就可以将前端的 span 和后端的 span 关联起来了。
  这里我们可以添加一个拦截器来使用 propagation 接口解析 span context,代码如下所示:
  // src/main/java/com/youdianzhishi/orderservice/interceptor/OpenTelemetryInterceptor.java
  package com.youdianzhishi.orderservice.interceptor;
  // ......
  @Component
  public class OpenTelemetryInterceptor implements HandlerInterceptor {
      @Autowired
      private OpenTelemetry openTelemetry;
      @Override
      public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
          TextMapGetter<HttpServletRequest> getter = new TextMapGetter<>() {
              @Override
              public Iterable<String> keys(HttpServletRequest carrier) {
                  return Collections.list(carrier.getHeaderNames());
              }
              @Override
              public String get(HttpServletRequest carrier, String key) {
                  return carrier.getHeader(key);
              }
          };
          // 提取传入的Trace Context
          Context extractedContext = openTelemetry.getPropagators().getTextMapPropagator()
                                         .extract(Context.current(), request, getter);
          StringBuilder sb = new StringBuilder();
          sb.append(request.getMethod()).append(" ").append(request.getRequestURI());
          Span span = tracer.spanBuilder(sb.toString()).setParent(extractedContext)
                      .startSpan();
          // 将解析出来的SpanContext存储在请求属性中,以便后续使用
          request.setAttribute("currentSpan", span);
          return true;
      }
  }
  上面代码中我们首先通过 openTelemetry.getPropagators().getTextMapPropagator() 方法来获取到 TextMapPropagator,然后通过 extract 方法来解析 span context,然后将解析出来的 span context 设置为子 span 的父级 span,最后将 span context 存储在请求属性中,以便后续使用。
  这里的关键是在初始化 OpenTelemetry 的时候需要配置 ContextPropagators,代码如下所示:
  // 初始化 ContextPropagators,这里我们配置包含 W3C Trace Context 和 W3C Baggage
  ContextPropagators propagators = ContextPropagators.create(
          TextMapPropagator.composite(
                  W3CTraceContextPropagator.getInstance(),
                  W3CBaggagePropagator.getInstance()));
  这样我们才能去解析 TraceContext 和 Baggage 两种上下文传播机制。而其中的 getter 就是用来从 HTTP 请求头中获取 span context 的方式。
  当然最后我们还需要在 WebMvcConfig 中注册该拦截器,代码如下所示:
  // src/main/java/com/youdianzhishi/orderservice/config/WebMvcConfig.java
  package com.youdianzhishi.orderservice.config;
  // ......
  @Configuration
  @Order(4)
  public class WebMvcConfig implements WebMvcConfigurer {
      @Autowired
      private TokenInterceptor tokenInterceptor;
      @Autowired
      private OpenTelemetryInterceptor otelCtxInterceptor;
      @Override
      public void addInterceptors(InterceptorRegistry registry) {
          registry.addInterceptor(otelCtxInterceptor)
              .addPathPatterns("/api/orders/**");
          registry.addInterceptor(tokenInterceptor)
              .addPathPatterns("/api/orders/**") // 指定拦截器应该应用的路径模式
              .excludePathPatterns("/api/login", "/api/register"); // 指定应该排除的路径模式
      }
  }
  这样当我们在请求 /api/orders/** 下面的接口时,就可以从请求属性中获取父级的 span context 了。
  现在我们重新修改 getAllOrders 处理器,代码如下所示:
  @GetMapping
  public ResponseEntity<List<OrderDto>> getAllOrders(HttpServletRequest request) {
      // 从请求属性中获取 Span
      Span span = (Span) request.getAttribute("currentSpan");
      try {
          // 从拦截器中获取用户信息
          User user = (User) request.getAttribute("user");
          // 要根据 orderDate 倒序排列
          List<Order> orders = orderRepository.findByUserIdOrderByOrderDateDesc(user.getId());
          // 将Order转换为OrderDto
          List<OrderDto> orderDtos = orders.stream().map(order -> {
              try {
                  return order.toOrderDto(webClient);
              } catch (Exception e) {
                  throw new RuntimeException(e);
              }
          }).collect(Collectors.toList());
          span.setAttribute("user_id", user.getId());
          span.setAttribute("order_count", orders.size());
          return new ResponseEntity<>(orderDtos, HttpStatus.OK);
      } catch (Exception e) {
          // 记录 Span 错误
          span.recordException(e).setStatus(StatusCode.ERROR, e.getMessage());
          return new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR);
      } finally {
          // 记录 Span 结束时间
          span.end();
      }
  }
  这里我们首先通过请求属性获取到 span context,这里我们添加了两个属性,如果出现了异常则会记录异常信息,最后在 finally 代码块中结束 Span。
  现在我们重新启动容器,当我们访问订单列表后就可以看到 Jaeger UI 中多了一个 GET /api/orders 的 span 了,并且该 span 和前端 frontend 服务的 span 关联起来了。
  当然这还不够,因为我们的订单列表接口还会去请求 user-service 服务来获取用户信息,还会去请求 catalog-service 服务获取书籍信息,所以我们还需要在这两个请求中也注入我们这里的 span,这样就可以将整个链路串联起来了。
  首先针对 TokenInterceptor 拦截器我们先创建一个子 span,代码如下所示:
  // src/main/java/com/youdianzhishi/orderservice/interceptor/TokenInterceptor.java
  package com.youdianzhishi.orderservice.interceptor;
  // ......
  @Component
  public class TokenInterceptor implements HandlerInterceptor {
      @Autowired
      private WebClient webClient;
      @Autowired
      private Tracer tracer;
      @Override
      public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
          // 先获取 Span
          Span currentSpan = (Span) request.getAttribute("currentSpan");
          Context context = Context.current().with(currentSpan);
          // 创建新的 Span,作为子 Span
          Span span = tracer.spanBuilder("GET /api/userinfo")
              .setParent(context).startSpan();
          // 将子 Span 设置为当前上下文,相当于切换上下文到子 Span
          try (Scope scope = span.makeCurrent()) {
              try {
                  String token = request.getHeader("Authorization");
                  if (token == null) {
                      response.setStatus(HttpStatus.UNAUTHORIZED.value());
                      span.addEvent("Token is null").setStatus(StatusCode.ERROR);
                      return false;
                  }
                  // 从环境变量中获取 userServiceUrl
                  String userServiceEnv = System.getenv("USER_SERVICE_URL");
                  String userServiceUrl = userServiceEnv != null ? userServiceEnv : "http://localhost:8080";
                  User user = webClient.get()
                          .uri(userServiceUrl + "/api/userinfo")
                          .header(HttpHeaders.AUTHORIZATION, token)
                          .retrieve()
                          .onStatus(httpStatus -> httpStatus.equals(HttpStatus.UNAUTHORIZED),
                                  clientResponse -> Mono.error(new RuntimeException("Unauthorized")))
                          .onStatus(
                                  httpStatus -> httpStatus.is4xxClientError()
                                          && !httpStatus.equals(HttpStatus.UNAUTHORIZED),
                                  clientResponse -> Mono.error(new RuntimeException("Other Client Error")))
                          .bodyToMono(User.class)
                          .block();
                  if (user != null) {
                      request.setAttribute("user", user);
                      span.setAttribute("user_id", user.getId());
                      return true;
                  } else {
                      response.setStatus(HttpStatus.UNAUTHORIZED.value());
                      span.addEvent("User is null").setStatus(StatusCode.ERROR);
                      return false;
                  }
              } catch (RuntimeException e) {
                  span.recordException(e).setStatus(StatusCode.ERROR, e.getMessage());
                  if (e.getMessage().equals("Unauthorized")) {
                      response.setStatus(HttpStatus.UNAUTHORIZED.value());
                  } else {
                      response.setStatus(HttpStatus.BAD_REQUEST.value());
                  }
                  return false;
              } catch (Exception e) {
                  span.recordException(e).setStatus(StatusCode.ERROR, e.getMessage());
                  response.setStatus(HttpStatus.INTERNAL_SERVER_ERROR.value());
                  return false;
              } finally {
                  request.setAttribute("parentSpan", span);
                  span.end();
              }
          }
      }
  }
  在上面代码中我们首先获取当前上下文的 Span,然后创建一个名为 GET /api/userinfo 的 span,将其设置为当前上下文的子 span,并将上下文切换到当前子 span,然后执行我们的业务逻辑,最后结束子 span。
  然后我们可以统一在 WebClient 中来注入 span context,这样当我们 Java 服务请求其他服务的时候就可以形成链路。
  // src/main/java/com/youdianzhishi/orderservice/config/WebClientConfig.java
  package com.youdianzhishi.orderservice.config;
  // ......
  @Configuration
  @Order(3)
  public class WebClientConfig {
      @Autowired
      private OpenTelemetry openTelemetry;
      @Bean
      public WebClient webClient() {
          return WebClient.builder().filter(traceExchangeFilterFunction()).build();
      }
      @Bean
      public ExchangeFilterFunction traceExchangeFilterFunction() {
          return (clientRequest, next) -> {
              // 获取当前上下文的 Span
              Span currentSpan = Span.current();
              Context context = Context.current().with(currentSpan);
              // 创建新的请求头并添加跟踪信息
              HttpHeaders newHeaders = new HttpHeaders();
              newHeaders.putAll(clientRequest.headers());
              TextMapSetter<HttpHeaders> setter = new TextMapSetter<HttpHeaders>() {
                  @Override
                  public void set(HttpHeaders carrier, String key, String value) {
                      carrier.add(key, value);
                  }
              };
              // 将当前上下文的 Span 注入到请求头中
              openTelemetry.getPropagators().getTextMapPropagator().inject(context, newHeaders, setter);
              // 创建一个新的 ClientRequest 对象
              ClientRequest newRequest = ClientRequest.from(clientRequest)
                      .headers(headers -> headers.addAll(newHeaders))
                      .build();
              return next.exchange(newRequest);
          };
      }
  }
  在上面代码中我们为 WebClient 添加了一个名为 traceExchangeFilterFunction 的过滤器函数,在该函数中我们首先获取当前上下文的 Span,然后创建一个新的请求头并添加跟踪信息,最后将当前上下文的 Span 通过 Propagator 接口注入到请求头中,这样当我们请求其他服务的时候就可以形成链路了。
  现在我们重新启动容器,当我们访问订单列表后就可以看到 Jaeger UI 中多了一个 GET /api/userinfo 的 span 了,并且该 span 和还会和 user-service 服务的 span 关联起来。
  同样的方式我们还可以在 getAllOrders 处理器中添加数据库查询的 span,代码如下所示:
  // 新建一个 DB 查询的 span
  Span dbSpan = tracer.spanBuilder("DB findByUserIdOrderByOrderDateDesc").setParent(context).startSpan();
  // 要根据 orderDate 倒序排列
  List<Order> orders = orderRepository.findByUserIdOrderByOrderDateDesc(user.getId());
  dbSpan.addEvent("OrderRepository findByUserIdOrderByOrderDateDesc From DB");
  dbSpan.setAttribute("order_count", orders.size());
  dbSpan.end();
  将 Order 转换为 OrderDto 也可以添加一个 span,代码如下所示:
  // src/main/java/com/youdianzhishi/orderservice/model/Order.java
  package com.youdianzhishi.orderservice.model;
  // ......
  public OrderDto toOrderDto(WebClient webClient, Tracer tracer, Context context) throws Exception {
      // 创建新的 Span,作为子 Span
      Span span = tracer.spanBuilder("GET /api/books/batch").setParent(context).startSpan();
      try (Scope scope = span.makeCurrent()) { // 切换上下文到子 Span
          span.setAttribute("order_id", this.getId());
          span.setAttribute("status", this.getStatus());
          OrderDto orderDto = new OrderDto();
          orderDto.setId(this.getId());
          orderDto.setStatus(this.getStatus());
          SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
          String strDate = formatter.format(this.getOrderDate());
          orderDto.setOrderDate(strDate);
          List<Integer> bookIds = this.getBookIds(); // 假设你有一个可以获取书籍ID的方法
          // 将 bookIds 转换为字符串,以便于传递给 WebClient
          String bookIdsStr = bookIds.stream().map(String::valueOf).collect(Collectors.joining(","));
          span.addEvent("get book ids");
          span.setAttribute("book_ids", bookIdsStr);
          // 用 WebClient 调用批量查询书籍的服务接口
          // 从环境变量中获取 bookServiceUrl
          String catalogServiceEnv = System.getenv("CATALOG_SERVICE_URL");
          String catalogServiceUrl = catalogServiceEnv != null ? catalogServiceEnv : "http://localhost:8082";
          Mono<List<BookDto>> booksMono = webClient.get() // 假设你有一个webClient实例
                  .uri(catalogServiceUrl + "/api/books/batch?ids=" + bookIdsStr)
                  .retrieve()
                  .bodyToMono(new ParameterizedTypeReference<>() {
                  });
          List<BookDto> books = booksMono.block();
          span.addEvent("get books info from catalog service");
          // 还需要将书籍数量和总价填充到 OrderDto 对象中
          int totalAmount = 0;
          int totalCount = 0;
          List<BookQuantity> bqs = this.getBookQuantities();
          for (BookDto book : books) {
              // 如果 book.id 在 bqs 中,那么就将对应的数量设置到 book.quantity 中
              int quantity = bqs.stream().filter(bq -> bq.getId() == book.getId()).findFirst().get().getQuantity();
              book.setQuantity(quantity);
              totalCount += quantity;
              totalAmount += book.getPrice() * quantity;
          }
          orderDto.setBooks(books);
          orderDto.setAmount(totalAmount);
          orderDto.setTotal(totalCount);
          span.addEvent("calculate total amount and total count");
          span.end();
          return orderDto;
      }
  }
  这里同样我们会为每一个转换创建一个子 span,然后将其设置为当前上下文的子 span,最后结束子 span,这样当我们通过 WebClient 去请求 catalog-service 服务的时候也就可以形成链路了。
  最后我们再去查看下完整的链路,如下图所示:

6-7

6-7

TAG: 软件开发 Java java

 

评分:0

我来说两句

Open Toolbar