如何做到Python和Java之间相互调用

发表于:2022-12-19 10:02

字体: | 上一篇 | 下一篇 | 我要投稿

 作者:Kerry___    来源:CSDN

  需求使用背景, 业务系统自定义了一套数据处理语言,支持sql,elasticsearch查询数据,然后经过一系列自定义处理,自定义处理支持执行python脚本,最终返回给调用端。
  具体执行流程,java服务执行页面输入的python语句,python语句调用java方法得到结果,python进行自定义处理后再返回给java服务,java服务最终返回具体的结果给客户端。
  语法示例 其中, | dbquery , | python 为自定义语法示例,最终是通过java执行返回结果。
  | dbquery sql="select id,num,create_time from security_log"
  | python
  import sys
  from py4j.java_gateway import JavaGateway, GatewayParameters
  gateway = JavaGateway(gateway_parameters=GatewayParameters(port=int(sys.argv[1])))
  SESSION_ID = sys.argv[2]
   
  # 获取java 实例类
  entry= gateway.entry_point
  # 调用 java具体方法,java返回具体的json
  preDataJson = entry.getPreDateJson(SESSION_ID)
   
  # 以下开始处理java返回的结果json
  import pandas as pd
  df = pd.read_json(preDataJson)
  rtn_data = df[:2].to_json(orient='records')
  # 处理完成的rtn_data 再设置给java 实例
  entry.setDatasetJson(SESSION_ID,rtn_data )
  首先java服务中引用依赖。
      <dependency>
        <groupId>net.sf.py4j</groupId>
        <artifactId>py4j</artifactId>
        <version>0.10.7</version>
      </dependency>
  首先java端定义使用pythonServer网关入口。
  public class PythonServer {
   
      /**
       * python 网关服务
       */
      private GatewayServer gatewayServer;
   
      /**
       * python 服务启动端口
       */
      private int port = GatewayServer.DEFAULT_PORT;
   
      /**
       * 缓存当前几个 python reader
       */
      private Map<String, PythonComd> pythonComdMap = new ConcurrentHashMap<>();
   
      public PythonServer() {
          for (int i = 0; i < 10; i++) {
              if (gatewayServer == null) {
                  try {
                      gatewayServer = new GatewayServer(new EntryPoint(), port);
                      gatewayServer.start();
                      log.info("Python 网关启动成功,端口:" + port);
                  } catch (Exception e) {
                      log.warn("Python 网关启动失败,端口:" + port);
                      port += 2;//每次加2
                      log.warn("Python 网关尝试下一个启动端口:" + port);
                  }
              }
          }
          if (gatewayServer == null) {
              log.error("Python 网关启动失败 10 次,不再进行重试,终止启动。");
              System.exit(-1);
          }
      }
   
      /**
       * 打开一个 java to python 的会话,实际只是 put 一个识别自身的 uuid
       * @param uuid
       * @param pythonComd
       */
      public void openSession(String uuid, PythonComd pythonComd) {
          pythonComdMap.put(uuid, pythonComd);
      }
   
      /**
       * 关闭一个 java to python 的会话
       * @param uuid
       */
      public void closeSession(String uuid) {
          pythonComdMap.remove(uuid);
      }
   
      /**
       * 用于 python 与 java GatewayServer 通信的端口号
       * @return
       */
      public int getPort() {
          return port;
      }
   
      /**
       * 当 java 进程关闭时,结束 python 网关。
       */
      @PreDestroy
      public void shutdown() {
          gatewayServer.shutdown();
      }
   
      class EntryPoint {
   
          /**
           * 可以传递 json 给 python,但限制最大加载条数为 ENGINE_QUERY_LIMIT
           * 优点:结构化数据,支持嵌套数组。缺点:空数组时没有字段名。
           * @param uuid
           * @return
           */
          public String getPreDataJson(String uuid) {
              PythonComd pythonComd = pythonComdMap.get(uuid);
              if (pythonComd == null) {
                  return null;
              }
              Dataset<Row> preDataset = pythonComd.getPreDateset();
              if (preDataset == null) {
                  return null;
              }
              try {
                  SparkContext sparkContext = preDataset.sparkSession().sparkContext();
                  PythonConf conf = pythonComd.getConf();
                  String taskId = conf.getTaskId();
                  String description = AppConfig.ENGINE_ADDR + "/search?taskId=" + taskId;// 仅用于 spark web ui 显示,暂时没有扩展此接口。
                  sparkContext.setJobGroup(uuid, description, true);
                  sparkContext.setLocalProperty(CallSite.SHORT_FORM(), "Python 进程获取 JSON 格式结果集。");
   
                  List<Map<String, Object>> list = DatasetUtil.datasetToList(preDataset, AppConfig.ENGINE_QUERY_LIMIT, taskId, "python 进程获取 json 格式结果集。");
                  return JsonUtil.objToJson(list);
              } catch (GplException e) {
                  log.error(e.getMessage(), e);//2021-02-04,暂时没想明白 python 拉取 java 时,异常传递给前端
              }
              return null;
          }
   
   
   
          /**
           * 接收 python 返回的 json。
           * @param uuid
           * @param json
           */
          public void setDataJson(String uuid, String json) {
              PythonComd pythonComd = pythonComdMap.get(uuid);
              if (pythonComd == null) {
                  return;
              }
              if (json == null || json.isEmpty()) {
                  PythonConf conf = pythonComd.getConf();
                  log.error("任务 id:" + conf.getTaskId() + "," + conf.getComdName() + " 命令,Python 进程返回的 JSON 数据为空!");
                  return;
              }
              List<String> list = new ArrayList<>();
              list.add(json);
              SparkSession sparkSession = BeanFactory.getSparkSession();
              Dataset<String> datasetTmp = sparkSession.createDataset(list, Encoders.STRING());
              Dataset<Row> dataset = sparkSession.read().json(datasetTmp);
              pythonComd.setReturnResult(dataset);
          }
      }
  }
  java端执行python代码。
  public class PythonComd extends BaseComd<PythonConf> {
   
      /**
       * 单例,用于和 python 进程通信。
       */
      @Resource
      private PythonServer pythonServer;
   
      /**
       * python 进程执行结束后,回调 pythonComd.setReturnResult(dataset);
       * execSparkDataset() 方法,返回 this.returnResult;
       */
      @Setter
      private Dataset<Row> returnResult;
   
      /**
       * 输出流字符串。
       */
      private StringBuilder outputStringBuilder = new StringBuilder();
   
      /**
       * 错误流字符串。
       */
      private StringBuilder errorStringBuilder = new StringBuilder();
   
      /**
       * 构造方法,传入配置类。
       * @param conf
       */
      public PythonComd(PythonConf conf) {
          super(conf);
      }
   
   
      /**
       * 以命令行的方式,执行 python 进程。
       * @param preDataset 上一条命令生成的结果集对象
       * @return
       * @throws GplException
       */
      @Override
      public Dataset<Row> execSparkDataset(Dataset<Row> preDataset) throws GplException {
          if (preDataset == null) {
              throw new GplException("python 命令,未找到结果集!");
          }
          String script = conf.getExpression();
          String fileName = conf.getFilename();
          if (StringUtils.isEmpty(script) && StringUtils.isEmpty(fileName)) {
              throw new GplException("python 命令,脚本为空!");
          }
          if (StringUtils.isEmpty(script) && !StringUtils.isEmpty(fileName)) {//如果是执行py文件
              File pyFile = new File(NConst.PYTHON_PATH + fileName);
              if (!pyFile.exists()) {
                  throw new GplException("python 命令,GPL 的 python 目录找不到脚本文件:" + fileName);
              }
          }
          if (!StringUtils.isEmpty(AppConfig.PYTHON_ENV)) {
              File file = new File(AppConfig.PYTHON_ENV);
              if (!file.exists()) {
                  throw new GplException("python 命令,自定义环境变量,路径不存在:" + AppConfig.PYTHON_ENV);
              }
          }
   
          String uuid = conf.getUuid();
          pythonServer.openSession(uuid, this);
          String containerName = uuid;
          try {
              String scriptPath = null;
              if (!StringUtils.isEmpty(script)) {//优先执行脚本,如果没有脚本语句赐执行文件
                  String md5 = DigestUtils.md5Hex(script);
                  String tmpName = md5 + ".py";
                  scriptPath = NConst.PYTHON_PATH + tmpName;
                  File tmpFile = new File(scriptPath);//临时文件不再结束调用后删除,以后完善成定期清理
                  FileUtils.write(tmpFile, script, NConst.CHARSET_UTF_8);//把脚本写成临时文件
              } else {
                  scriptPath = NConst.PYTHON_PATH + fileName;
              }
   
              List<String> command = new ArrayList<>();
              if (!StringUtils.isEmpty(conf.getDocker())) {
                  // docker run --name containerName -v /home/nyx:/home/nyx --net host python:slim python /home/nyx/test.py 25333 uuid
                  command.add("docker");
                  command.add("run");
                  command.add("--name");
                  command.add(containerName);//容器名
                  command.add("-e");
                  command.add("TZ=Asia/Shanghai");//设置时区
                  command.add("-v");
                  command.add(NConst.PYTHON_PATH + ":" + NConst.PYTHON_PATH);// 绑定挂载目录
                  command.add("--net");
                  command.add("host");
                  command.add(conf.getDocker());// docker 镜像名
              }
   
              if (StringUtils.isEmpty(AppConfig.PYTHON_ENV)) {
                  command.add("python");
              } else {
                  command.add(AppConfig.PYTHON_ENV);
              }
              command.add(scriptPath);//py文件路径
              command.add(String.valueOf(pythonServer.getPort()));//端口号
              command.add(uuid);
              String cmd = String.join(" ", command);
              log.info("任务 id:" + conf.getTaskId() + ",python 执行命令:" + cmd);
              ProcessBuilder processBuilder = new ProcessBuilder(command);
              Process process = processBuilder.start();
              this.printProcessStream(process);
              int timeout = conf.getTimeout();
              log.info("任务 id:" + conf.getTaskId() + ",python 脚本执行超时参数为:" + timeout);
              if (timeout > 0) {
                  boolean isExited = process.waitFor(timeout, TimeUnit.MILLISECONDS);//脚本执行超时退出
                  if (!isExited) {//如果命令行子进程没有退出,销毁子进程。
                      process.destroy();
                  }
              } else {
                  process.waitFor();
              }
              this.clearTmpFile();//最后执行一下清理目录
          } catch (Exception e) {
              log.error(e.getMessage(), e);
          } finally {
              if (!StringUtils.isEmpty(conf.getDocker())) {
                  try {
                      Process process = new ProcessBuilder("docker", "rm", "-f", containerName).start();
                      this.printProcessStream(process);
                      process.waitFor();
                      log.info("任务 id:" + conf.getTaskId() + ",docker 执行 python 结束!删除容器:" + containerName);
                  } catch (Exception e) {
                      log.error(e.getMessage(), e);
                  }
              }
              pythonServer.closeSession(uuid);
          }
          if (errorStringBuilder.length() > 0) {
              String error = errorStringBuilder.toString();
              throw new GplException("python 命令,脚本执行异常!", error);
          }
          if (outputStringBuilder.length() > 0) {
              String input = outputStringBuilder.toString();
              comdTask.addWarnMsg("python 命令,控制台打印:" + input);
          }
          if (returnResult == null) {
              throw new RuntimeException("python 命令,脚本执行结束,脚本返回结果集为空!");
          }
          // 此处不要计算 count(),会消耗掉几十秒,直接打印一下字段名就行了。
          Map<String, String> fieldMap = new HashMap<>();
          StructType structType = returnResult.schema();
          StructField[] structFieldArray = structType.fields();
          for (StructField structField : structFieldArray) {
              fieldMap.put(structField.name(), structField.dataType().simpleString());
          }
          log.info("任务 id:" + conf.getTaskId() + ",python 命令,返回结果集字段:" + fieldMap.toString());
          return this.returnResult;
      }
   
      /**
       * 清理生成的临时文件
       */
      private void clearTmpFile() {
          File pyPath = new File(NConst.PYTHON_PATH);
          File[] fileArray = pyPath.listFiles(f -> {
              return f.getName().length() == 35//文件名长度 35 的是临时文件
                      && System.currentTimeMillis() - f.lastModified() > 86400000;//删除大于 1 天的文件
          });
          for (File file : fileArray) {
              file.delete();
          }
      }
   
      /**
       * 启动两个后台线程,用于接收 python 进程的输出流。
       * @param process
       */
      private void printProcessStream(Process process) {
          Thread inputThread = new Thread(() -> {
              try (BufferedReader inputReader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
                  String input = null;
                  while ((input = inputReader.readLine()) != null) {
                      outputStringBuilder.append("\n");
                      outputStringBuilder.append(input);
                  }
                  if (outputStringBuilder.length() > 0) {
                      log.info("任务 id:" + conf.getTaskId() + ",python print(): " + outputStringBuilder.toString());
                  }
              } catch (Exception e) {
                  log.error(e.getMessage(), e);
              }
          });
          inputThread.setName("gpl-py-input-stream-task-id=" + conf.getTaskId());
          inputThread.setDaemon(true);
          inputThread.start();
   
          Thread errorThread = new Thread(() -> {
              try (BufferedReader errorReader = new BufferedReader(new InputStreamReader(process.getErrorStream()))) {
                  String error = null;
                  while ((error = errorReader.readLine()) != null) {
                      errorStringBuilder.append("\n");
                      errorStringBuilder.append(error);
                  }
                  if (errorStringBuilder.length() > 0) {
                      log.error("任务 id:" + conf.getTaskId() + ",python error: " + errorStringBuilder.toString());
                  }
              } catch (Exception e) {
                  log.error(e.getMessage(), e);
              }
          });
          errorThread.setName("py-error-stream-task-id=" + conf.getTaskId());
          errorThread.setDaemon(true);
          errorThread.start();
      }
  }
  本文内容不用于商业目的,如涉及知识产权问题,请权利人联系51Testing小编(021-64471599-8017),我们将立即处理
《2023软件测试行业现状调查报告》独家发布~

关注51Testing

联系我们

快捷面板 站点地图 联系我们 广告服务 关于我们 站长统计 发展历程

法律顾问:上海兰迪律师事务所 项棋律师
版权所有 上海博为峰软件技术股份有限公司 Copyright©51testing.com 2003-2024
投诉及意见反馈:webmaster@51testing.com; 业务联系:service@51testing.com 021-64471599-8017

沪ICP备05003035号

沪公网安备 31010102002173号