一篇文章看懂TPCx-BB(大数据基准测试工具)源码

发表于:2017-11-23 08:03

字体: | 上一篇 | 下一篇 | 我要投稿

 作者:Jeffbond    来源:简书

  run()
  run() 方法是 RunBigBench.jar 里核心的方法。所有的执行都是通过 run() 方法调用的。比如 runQueries()、runModule()、generateData()等。runQueries()、runModule()、generateData() 又通过调用 runCmd() 方法来创建操作系统进程,执行bash命令,调用bash脚本。
  run() 方法里通过一个 while 循环来逐一执行 workload 里的每一个 benchmarkPhase。 不同的 benchmarkPhase 会调用 runQueries()、runModule()、generateData()...中的不同方法。
  try {
  long e = 0L;
  this.log.finer("Benchmark phases: " + this.benchmarkPhases);
  Iterator startCheckpoint = this.benchmarkPhases.iterator();
  long throughputStart;
  while(startCheckpoint.hasNext()) {
  BigBench.BenchmarkPhase children = (BigBench.BenchmarkPhase)startCheckpoint.next();
  if(children.isPhaseDone()) {
  this.log.info("The phase " + children.name() + " was already performed earlier. Skipping this phase");
  } else {
  try {
  switch($SWITCH_TABLE$io$bigdatabenchmark$v1$driver$BigBench$BenchmarkPhase()[children.ordinal()]) {
  case 1:
  case 20:
  throw new IllegalArgumentException("The value " + children.name() + " is only used internally.");
  case 2:
  this.log.info(children.getConsoleMessage());
  e = System.currentTimeMillis();
  break;
  case 3:
  if(!BigBench.BenchmarkPhase.BENCHMARK_START.isPhaseDone()) {
  throw new IllegalArgumentException("Error: Cannot stop the benchmark before starting it");
  }
  throughputStart = System.currentTimeMillis();
  this.log.info(String.format("%-55s finished. Time: %25s", new Object[]{children.getConsoleMessage(), BigBench.Helper.formatTime(throughputStart - e)}));
  this.logTreeRoot.setCheckpoint(new BigBench.Checkpoint(BigBench.BenchmarkPhase.BENCHMARK, -1L, -1L, e, throughputStart, this.logTreeRoot.isSuccessful()));
  break;
  case 4:
  case 15:
  case 18:
  case 22:
  case 27:
  case 28:
  case 29:
  this.runModule(children, this.userArguments);
  break;
  case 5:
  case 10:
  case 11:
  this.runQueries(children, 1, validationArguments);
  break;
  case 6:
  case 9:
  this.runModule(children, validationArguments);
  break;
  case 7:
  this.generateData(children, false, validationArguments);
  break;
  case 8:
  this.generateData(children, true, validationArguments);
  break;
  case 12:
  case 19:
  case 24:
  this.runQueries(children, 1, this.userArguments);
  break;
  case 13:
  case 14:
  case 21:
  case 23:
  case 25:
  case 26:
  this.runQueries(children, this.numberOfParallelStreams, this.userArguments);
  break;
  case 16:
  this.generateData(children, false, this.userArguments);
  break;
  case 17:
  this.generateData(children, true, this.userArguments);
  }
  children.setPhaseDone(true);
  } catch (IOException var21) {
  this.log.info("==============\nBenchmark run terminated\nReason: An error occured while running a command in phase " + children + "\n==============");
  var21.printStackTrace();
  if(this.stopAfterFailure || children.mustSucceed()) {
  break;
  }
  }
  }
  }
  这里的 case 1-29 并不是 1-29 条查询,而是枚举类型里的 1-29 个 benmarkPhase 。如下所示:
  private static enum BenchmarkPhase {
  BENCHMARK((String)null, "benchmark", false, false, false, false, "BigBench benchmark"),
  BENCHMARK_START((String)null, "benchmark_start", false, false, false, false, "BigBench benchmark: Start"),
  BENCHMARK_STOP((String)null, "benchmark_stop", false, false, false, false, "BigBench benchmark: Stop"),
  CLEAN_ALL("cleanAll", "clean_all", false, false, false, false, "BigBench clean all"),
  ENGINE_VALIDATION_CLEAN_POWER_TEST("cleanQuery", "engine_validation_power_test", false, false, false, false, "BigBench engine validation: Clean power test queries"),
  ENGINE_VALIDATION_CLEAN_LOAD_TEST("cleanMetastore", "engine_validation_metastore", false, false, false, false, "BigBench engine validation: Clean metastore"),
  ENGINE_VALIDATION_CLEAN_DATA("cleanData", "engine_validation_data", false, false, false, false, "BigBench engine validation: Clean data"),
  ENGINE_VALIDATION_DATA_GENERATION("dataGen", "engine_validation_data", false, false, false, true, "BigBench engine validation: Data generation"),
  ENGINE_VALIDATION_LOAD_TEST("populateMetastore", "engine_validation_metastore", false, false, false, true, "BigBench engine validation: Populate metastore"),
  ENGINE_VALIDATION_POWER_TEST("runQuery", "engine_validation_power_test", false, false, false, false, "BigBench engine validation: Power test"),
  ENGINE_VALIDATION_RESULT_VALIDATION("validateQuery", "engine_validation_power_test", false, false, true, false, "BigBench engine validation: Check all query results"),
  CLEAN_POWER_TEST("cleanQuery", "power_test", false, false, false, false, "BigBench clean: Clean power test queries"),
  CLEAN_THROUGHPUT_TEST_1("cleanQuery", "throughput_test_1", false, false, false, false, "BigBench clean: Clean first throughput test queries"),
  CLEAN_THROUGHPUT_TEST_2("cleanQuery", "throughput_test_2", false, false, false, false, "BigBench clean: Clean second throughput test queries"),
  CLEAN_LOAD_TEST("cleanMetastore", "metastore", false, false, false, false, "BigBench clean: Load test"),
  CLEAN_DATA("cleanData", "data", false, false, false, false, "BigBench clean: Data"),
  DATA_GENERATION("dataGen", "data", false, false, false, true, "BigBench preparation: Data generation"),
  LOAD_TEST("populateMetastore", "metastore", false, false, false, true, "BigBench phase 1: Load test"),
  POWER_TEST("runQuery", "power_test", false, true, false, false, "BigBench phase 2: Power test"),
  THROUGHPUT_TEST((String)null, "throughput_test", false, false, false, false, "BigBench phase 3: Throughput test"),
  THROUGHPUT_TEST_1("runQuery", "throughput_test_1", true, true, false, false, "BigBench phase 3: First throughput test run"),
  THROUGHPUT_TEST_REFRESH("refreshMetastore", "throughput_test_refresh", false, false, false, false, "BigBench phase 3: Throughput test data refresh"),
  THROUGHPUT_TEST_2("runQuery", "throughput_test_2", true, true, false, false, "BigBench phase 3: Second throughput test run"),
  VALIDATE_POWER_TEST("validateQuery", "power_test", false, false, true, false, "BigBench validation: Power test results"),
  VALIDATE_THROUGHPUT_TEST_1("validateQuery", "throughput_test_1", false, false, true, false, "BigBench validation: First throughput test results"),
  VALIDATE_THROUGHPUT_TEST_2("validateQuery", "throughput_test_2", false, false, true, false, "BigBench validation: Second throughput test results"),
  SHOW_TIMES("showTimes", "show_times", false, false, true, false, "BigBench: show query times"),
  SHOW_ERRORS("showErrors", "show_errors", false, false, true, false, "BigBench: show query errors"),
  SHOW_VALIDATION("showValidation", "show_validation", false, false, true, false, "BigBench: show query validation results");
  private String runModule;
  private String namePattern;
  private boolean queryOrderRandom;
  private boolean queryOrderCached;
  private boolean printStdOut;
  private boolean mustSucceed;
  private String consoleMessage;
  private boolean phaseDone;
  private BenchmarkPhase(String runModule, String namePattern, boolean queryOrderRandom, boolean queryOrderCached, boolean printStdOut, boolean mustSucceed, String consoleMessage) {
  this.runModule = runModule;
  this.namePattern = namePattern;
  this.queryOrderRandom = queryOrderRandom;
  this.queryOrderCached = queryOrderCached;
  this.printStdOut = printStdOut;
  this.mustSucceed = mustSucceed;
  this.consoleMessage = consoleMessage;
  this.phaseDone = false;
  }
  3对应 BENCHMARK_STOP,4对应 CLEAN_ALL,29对应 SHOW_VALIDATION,依此类推...
  可以看出:
  CLEAN_ALL、CLEAN_LOAD_TEST、LOAD_TEST、THROUGHPUT_TEST_REFRESH、SHOW_TIMES、SHOW_ERRORS、SHOW_VALIDATION等benchmarkPhases调用的是
  this.runModule(children, this.userArguments);
  方法是 runModule ,参数是 this.userArguments。
  ENGINE_VALIDATION_CLEAN_POWER_TEST、ENGINE_VALIDATION_POWER_TEST、ENGINE_VALIDATION_RESULT_VALIDATION 调用的是
  this.runQueries(children, 1, validationArguments);
  方法是 runQueries ,参数是 1(stream number) 和 validationArguments。
  ENGINE_VALIDATION_CLEAN_LOAD_TEST 和 ENGINE_VALIDATION_LOAD_TEST 调用的是
  this.runModule(children, validationArguments);
  ENGINE_VALIDATION_CLEAN_DATA 调用的是
  this.generateData(children, false, validationArguments);
  ENGINE_VALIDATION_DATA_GENERATION 调用的是
  this.generateData(children, true, validationArguments);
  CLEAN_POWER_TEST、POWER_TEST、VALIDATE_POWER_TEST 调用的是
  this.runQueries(children, 1, this.userArguments);
  CLEAN_THROUGHPUT_TEST_1``CLEAN_THROUGHPUT_TEST_2``THROUGHPUT_TEST_1``THROUGHPUT_TEST_2``VALIDATE_THROUGHPUT_TEST_1 VALIDATE_THROUGHPUT_TEST_2 调用的是
  this.runQueries(children, this.numberOfParallelStreams, this.userArguments);
  CLEAN_DATA 调用的是
  this.generateData(children, false, this.userArguments);
  DATA_GENERATION 调用的是
  this.generateData(children, true, this.userArguments);
  总结一下以上的方法调用可以发现:
  跟 ENGINE_VALIDATION 相关的benchmarkPhase用的参数都是 validationArguments。其余用的是 userArguments( validationArguments 和 userArguments 唯一的区别是 validationArguments 的 SCALE_FACTOR 恒为1)
  跟 POWER_TEST 相关的都是调用 runQueries() 方法,因为 POWER_TEST 就是执行SQL查询
  跟 CLEAN_DATA DATA_GENERATION 相关的都是调用 generateData() 方法
  跟 LOAD_TEST SHOW 相关的都是调用 runModule() 方法
  benchmarkPhase 和 module 对应关系
  具体每个 benchmarkPhase 跟 module(执行的脚本)的对应关系如下:
  CLEAN_ALL -> "cleanAll"
  ENGINE_VALIDATION_CLEAN_POWER_TEST -> "cleanQuery"
  ENGINE_VALIDATION_CLEAN_LOAD_TEST -> "cleanMetastore",
  ENGINE_VALIDATION_CLEAN_DATA -> "cleanData"
  ENGINE_VALIDATION_DATA_GENERATION -> "dataGen"
  ENGINE_VALIDATION_LOAD_TEST -> "populateMetastore"
  ENGINE_VALIDATION_POWER_TEST -> "runQuery"
  ENGINE_VALIDATION_RESULT_VALIDATION -> "validateQuery"
  CLEAN_POWER_TEST -> "cleanQuery"
  CLEAN_THROUGHPUT_TEST_1 -> "cleanQuery"
  CLEAN_THROUGHPUT_TEST_2 -> "cleanQuery"
  CLEAN_LOAD_TEST -> "cleanMetastore"
  CLEAN_DATA -> "cleanData"
  DATA_GENERATION -> "dataGen"
  LOAD_TEST -> "populateMetastore"
  POWER_TEST -> "runQuery"
  THROUGHPUT_TEST -> (String)null
  THROUGHPUT_TEST_1 -> "runQuery"
  THROUGHPUT_TEST_REFRESH -> "refreshMetastore"
  THROUGHPUT_TEST_2 -> "runQuery"
  VALIDATE_POWER_TEST -> "validateQuery"
  VALIDATE_THROUGHPUT_TEST_1 -> "validateQuery"
  VALIDATE_THROUGHPUT_TEST_2 -> "validateQuery"
  SHOW_TIMES -> "showTimes"
  SHOW_ERRORS -> "showErrors"
  SHOW_VALIDATION -> "showValidation"
  当执行某个 benchmarkPhase 时会去调用如上该 benchmarkPhase 对应的 module (脚本位于 $BENCH_MARK_HOME/engines/hive/bin 目录下)
  cmdLine.add(benchmarkPhase.getRunModule());
  程序调用流程
  bigBench.png
  接下来介绍每个module的功能
  module
  cleanAll
  1. DROP DATABASE
  2. 删除hdfs上的源数据
  echo "dropping database (with all tables)"
  runCmdWithErrorCheck runEngineCmd -e "DROP DATABASE IF EXISTS $BIG_BENCH_DATABASE CASCADE;"
  echo "cleaning ${BIG_BENCH_HDFS_ABSOLUTE_HOME}"
  hadoop fs -rm -r -f -skipTrash "${BIG_BENCH_HDFS_ABSOLUTE_HOME}"
  cleanQuery
  1. 删除对应的 Query 生成的临时表
  2. 删除对应的 Query 生成的结果表
  runCmdWithErrorCheck runEngineCmd -e "DROP TABLE IF EXISTS $TEMP_TABLE1; DROP TABLE IF EXISTS $TEMP_TABLE2; DROP TABLE IF EXISTS $RESULT_TABLE;"
  return $?
  cleanMetastore
  1. 调用 `dropTables.sql` 将23张表依次DROP
  echo "cleaning metastore tables"
  runCmdWithErrorCheck runEngineCmd -f "$BIG_BENCH_CLEAN_METASTORE_FILE"
  export BIG_BENCH_CLEAN_METASTORE_FILE="$BIG_BENCH_CLEAN_DIR/dropTables.sql"
  dropTables.sql 将23张表依次DROP,源码如下:
  DROP TABLE IF EXISTS ${hiveconf:customerTableName};
  DROP TABLE IF EXISTS ${hiveconf:customerAddressTableName};
  DROP TABLE IF EXISTS ${hiveconf:customerDemographicsTableName};
  DROP TABLE IF EXISTS ${hiveconf:dateTableName};
  DROP TABLE IF EXISTS ${hiveconf:householdDemographicsTableName};
  DROP TABLE IF EXISTS ${hiveconf:incomeTableName};
  DROP TABLE IF EXISTS ${hiveconf:itemTableName};
  DROP TABLE IF EXISTS ${hiveconf:promotionTableName};
  DROP TABLE IF EXISTS ${hiveconf:reasonTableName};
  DROP TABLE IF EXISTS ${hiveconf:shipModeTableName};
  DROP TABLE IF EXISTS ${hiveconf:storeTableName};
  DROP TABLE IF EXISTS ${hiveconf:timeTableName};
  DROP TABLE IF EXISTS ${hiveconf:warehouseTableName};
  DROP TABLE IF EXISTS ${hiveconf:webSiteTableName};
  DROP TABLE IF EXISTS ${hiveconf:webPageTableName};
  DROP TABLE IF EXISTS ${hiveconf:inventoryTableName};
  DROP TABLE IF EXISTS ${hiveconf:storeSalesTableName};
  DROP TABLE IF EXISTS ${hiveconf:storeReturnsTableName};
  DROP TABLE IF EXISTS ${hiveconf:webSalesTableName};
  DROP TABLE IF EXISTS ${hiveconf:webReturnsTableName};
  DROP TABLE IF EXISTS ${hiveconf:marketPricesTableName};
  DROP TABLE IF EXISTS ${hiveconf:clickstreamsTableName};
  DROP TABLE IF EXISTS ${hiveconf:reviewsTableName};
  cleanData
  1. 删除hdfs上 /user/root/benchmarks/bigbench/data 目录
  2. 删除hdfs上 /user/root/benchmarks/bigbench/data_refresh 目录
  echo "cleaning ${BIG_BENCH_HDFS_ABSOLUTE_INIT_DATA_DIR}"
  hadoop fs -rm -r -f -skipTrash "${BIG_BENCH_HDFS_ABSOLUTE_INIT_DATA_DIR}"
  echo "cleaning ${BIG_BENCH_HDFS_ABSOLUTE_REFRESH_DATA_DIR}"
  hadoop fs -rm -r -f -skipTrash "${BIG_BENCH_HDFS_ABSOLUTE_REFRESH_DATA_DIR}"
  dataGen
  1. 创建目录 /user/root/benchmarks/bigbench/data 并赋予权限
  2. 创建目录 /user/root/benchmarks/bigbench/data_refresh 并赋予权限
  3. 调用 HadoopClusterExec.jar 和 pdgf.jar 生成 base data 到 /user/root/benchmarks/bigbench/data 目录下
  4. 调用 HadoopClusterExec.jar 和 pdgf.jar 生成 refresh data 到 /user/root/benchmarks/bigbench/data_refresh 目录下
  创建目录 /user/root/benchmarks/bigbench/data 并赋予权限
  runCmdWithErrorCheck hadoop fs -mkdir -p "${BIG_BENCH_HDFS_ABSOLUTE_INIT_DATA_DIR}"
  runCmdWithErrorCheck hadoop fs -chmod 777 "${BIG_BENCH_HDFS_ABSOLUTE_INIT_DATA_DIR}"
  创建目录 /user/root/benchmarks/bigbench/data_refresh 并赋予权限
  runCmdWithErrorCheck hadoop fs -mkdir -p "${BIG_BENCH_HDFS_ABSOLUTE_REFRESH_DATA_DIR}"
  runCmdWithErrorCheck hadoop fs -chmod 777 "${BIG_BENCH_HDFS_ABSOLUTE_REFRESH_DATA_DIR}"
  调用 HadoopClusterExec.jar 和 pdgf.jar 生成 base data
  runCmdWithErrorCheck hadoop jar "${BIG_BENCH_TOOLS_DIR}/HadoopClusterExec.jar" -archives "${PDGF_ARCHIVE_PATH}" ${BIG_BENCH_DATAGEN_HADOOP_EXEC_DEBUG} -taskFailOnNonZeroReturnValue -execCWD "${PDGF_DISTRIBUTED_NODE_DIR}" ${HadoopClusterExecOptions} -exec ${BIG_BENCH_DATAGEN_HADOOP_JVM_ENV} -cp "${HADOOP_CP}:pdgf.jar" ${PDGF_CLUSTER_CONF} pdgf.Controller -nc HadoopClusterExec.tasks -nn HadoopClusterExec.taskNumber -ns -c -sp REFRESH_PHASE 0 -o "'${BIG_BENCH_HDFS_ABSOLUTE_INIT_DATA_DIR}/'+table.getName()+'/'" ${BIG_BENCH_DATAGEN_HADOOP_OPTIONS} -s ${BIG_BENCH_DATAGEN_TABLES} ${PDGF_OPTIONS} "$@" 2>&1 | tee -a "$BIG_BENCH_DATAGEN_STAGE_LOG" 2>&1
  调用 HadoopClusterExec.jar 和 pdgf.jar 生成 refresh data
  runCmdWithErrorCheck hadoop jar "${BIG_BENCH_TOOLS_DIR}/HadoopClusterExec.jar" -archives "${PDGF_ARCHIVE_PATH}" ${BIG_BENCH_DATAGEN_HADOOP_EXEC_DEBUG} -taskFailOnNonZeroReturnValue -execCWD "${PDGF_DISTRIBUTED_NODE_DIR}" ${HadoopClusterExecOptions} -exec ${BIG_BENCH_DATAGEN_HADOOP_JVM_ENV} -cp "${HADOOP_CP}:pdgf.jar" ${PDGF_CLUSTER_CONF} pdgf.Controller -nc HadoopClusterExec.tasks -nn HadoopClusterExec.taskNumber -ns -c -sp REFRESH_PHASE 1 -o "'${BIG_BENCH_HDFS_ABSOLUTE_REFRESH_DATA_DIR}/'+table.getName()+'/'" ${BIG_BENCH_DATAGEN_HADOOP_OPTIONS} -s ${BIG_BENCH_DATAGEN_TABLES} ${PDGF_OPTIONS} "$@" 2>&1 | tee -a "$BIG_BENCH_DATAGEN_STAGE_LOG" 2>&1
  populateMetastore
  该过程是真正的创建数据库表的过程。建表的过程调用的是 $BENCH_MARK_HOME/engines/hive/population/ 目录下的 hiveCreateLoad.sql ,通过该sql文件来建数据库表。
  从 /user/root/benchmarks/bigbench/data 路径下读取 .dat 的原始数据,生成 TEXTFILE 格式的外部临时表
  用 select * from 临时表 来创建最终的 ORC 格式的数据库表
  删除外部临时表。
  从 /user/root/benchmarks/bigbench/data 路径下读取 .dat 的原始数据,生成 TEXTFILE 格式的外部临时表
DROP TABLE IF EXISTS ${hiveconf:customerTableName}${hiveconf:temporaryTableSuffix};
CREATE EXTERNAL TABLE ${hiveconf:customerTableName}${hiveconf:temporaryTableSuffix}
( c_customer_sk             bigint              --not null
, c_customer_id             string              --not null
, c_current_cdemo_sk        bigint
, c_current_hdemo_sk        bigint
, c_current_addr_sk         bigint
, c_first_shipto_date_sk    bigint
, c_first_sales_date_sk     bigint
, c_salutation              string
, c_first_name              string
, c_last_name               string
, c_preferred_cust_flag     string
, c_birth_day               int
, c_birth_month             int
, c_birth_year              int
, c_birth_country           string
, c_login                   string
, c_email_address           string
, c_last_review_date        string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '${hiveconf:fieldDelimiter}'
STORED AS TEXTFILE LOCATION '${hiveconf:hdfsDataPath}/${hiveconf:customerTableName}'
;
  用 select * from 临时表 来创建最终的 ORC 格式的数据库表
DROP TABLE IF EXISTS ${hiveconf:customerTableName};
CREATE TABLE ${hiveconf:customerTableName}
STORED AS ${hiveconf:tableFormat}
AS
SELECT * FROM ${hiveconf:customerTableName}${hiveconf:temporaryTableSuffix}
;
  删除外部临时表
DROP TABLE ${hiveconf:customerTableName}${hiveconf:temporaryTableSuffix};
runQuery
1. runQuery 调用每个query下的 run.sh 里的 `query_run_main_method()` 方法
2. `query_run_main_method()` 调用 `runEngineCmd` 来执行query脚本(qxx.sql)
runQuery 调用每个query下的 run.sh 里的 query_run_main_method() 方法
QUERY_MAIN_METHOD="query_run_main_method"
-----------------------------------------
"$QUERY_MAIN_METHOD" 2>&1 | tee -a "$LOG_FILE_NAME" 2>&1
query_run_main_method() 调用 runEngineCmd 来执行query脚本(qxx.sql)
query_run_main_method () {
QUERY_SCRIPT="$QUERY_DIR/$QUERY_NAME.sql"
if [ ! -r "$QUERY_SCRIPT" ]
then
echo "SQL file $QUERY_SCRIPT can not be read."
exit 1
fi
runCmdWithErrorCheck runEngineCmd -f "$QUERY_SCRIPT"
return $?
}
  一般情况下 query_run_main_method () 方法只是执行对应的query脚本,但是像 q05、q20... 这些查询,用到了机器学习算法,所以在执行对应的query脚本后会把生成的结果表作为输入,然后调用执行机器学习算法(如聚类、逻辑回归)的jar包继续执行,得到最终的结果。
runEngineCmd () {
if addInitScriptsToParams
then
"$BINARY" "${BINARY_PARAMS[@]}" "${INIT_PARAMS[@]}" "$@"
else
return 1
fi
}
--------------------------
BINARY="/usr/bin/hive"
BINARY_PARAMS+=(--hiveconf BENCHMARK_PHASE=$BIG_BENCH_BENCHMARK_PHASE --hiveconf STREAM_NUMBER=$BIG_BENCH_STREAM_NUMBER --hiveconf QUERY_NAME=$QUERY_NAME --hiveconf QUERY_DIR=$QUERY_DIR --hiveconf RESULT_TABLE=$RESULT_TABLE --hiveconf RESULT_DIR=$RESULT_DIR --hiveconf TEMP_TABLE=$TEMP_TABLE --hiveconf TEMP_DIR=$TEMP_DIR --hiveconf TABLE_PREFIX=$TABLE_PREFIX)
INIT_PARAMS=(-i "$BIG_BENCH_QUERY_PARAMS_FILE" -i "$BIG_BENCH_ENGINE_SETTINGS_FILE")
INIT_PARAMS+=(-i "$LOCAL_QUERY_ENGINE_SETTINGS_FILE")
if [ -n "$USER_QUERY_PARAMS_FILE" ]
then
if [ -r "$USER_QUERY_PARAMS_FILE" ]
then
echo "User defined query parameter file found. Adding $USER_QUERY_PARAMS_FILE to hive init."
INIT_PARAMS+=(-i "$USER_QUERY_PARAMS_FILE")
else
echo "User query parameter file $USER_QUERY_PARAMS_FILE can not be read."
return 1
fi
fi
if [ -n "$USER_ENGINE_SETTINGS_FILE" ]
then
if [ -r "$USER_ENGINE_SETTINGS_FILE" ]
then
echo "User defined engine settings file found. Adding $USER_ENGINE_SETTINGS_FILE to hive init."
INIT_PARAMS+=(-i "$USER_ENGINE_SETTINGS_FILE")
else
echo "User hive settings file $USER_ENGINE_SETTINGS_FILE can not be read."
return 1
fi
fi
return 0
  validateQuery
  1. 调用每个query下的 run.sh 里的 `query_run_validate_method()` 方法
  2. `query_run_validate_method()` 比较 `$BENCH_MARK_HOME/engines/hive/queries/qxx/results/qxx-result` 和hdfs上 `/user/root/benchmarks/bigbench/queryResults/qxx_hive_${BIG_BENCH_BENCHMARK_PHASE}_${BIG_BENCH_STREAM_NUMBER}_result` 两个文件,如果一样,则验证通过,否则验证失败。
if diff -q "$VALIDATION_RESULTS_FILENAME" <(hadoop fs -cat "$RESULT_DIR/*")
then
echo "Validation of $VALIDATION_RESULTS_FILENAME passed: Query returned correct results"
else
echo "Validation of $VALIDATION_RESULTS_FILENAME failed: Query returned incorrect results"
VALIDATION_PASSED="0"
fi
  SF为1时(-f 1),用上面的方法比较,SF不为1(>1)时,只要hdfs上的结果表中行数大于等于1即验证通过
if [ `hadoop fs -cat "$RESULT_DIR/*" | head -n 10 | wc -l` -ge 1 ]
then
echo "Validation passed: Query returned results"
else
echo "Validation failed: Query did not return results"
return 1
fi
refreshMetastore
  1. 调用 `$BENCH_MARK_HOME/engines/hive/refresh/` 目录下的 `hiveRefreshCreateLoad.sql` 脚本
  2. `hiveRefreshCreateLoad.sql` 将hdfs上 `/user/root/benchmarks/bigbench/data_refresh/` 目录下每个表数据插入外部临时表
  3. 外部临时表再将每个表的数据插入Hive数据库对应的表中
hiveRefreshCreateLoad.sql 将hdfs上 /user/root/benchmarks/bigbench/data_refresh/ 目录下每个表数据插入外部临时表
DROP TABLE IF EXISTS ${hiveconf:customerTableName}${hiveconf:temporaryTableSuffix};
CREATE EXTERNAL TABLE ${hiveconf:customerTableName}${hiveconf:temporaryTableSuffix}
( c_customer_sk             bigint              --not null
, c_customer_id             string              --not null
, c_current_cdemo_sk        bigint
, c_current_hdemo_sk        bigint
, c_current_addr_sk         bigint
, c_first_shipto_date_sk    bigint
, c_first_sales_date_sk     bigint
, c_salutation              string
, c_first_name              string
, c_last_name               string
, c_preferred_cust_flag     string
, c_birth_day               int
, c_birth_month             int
, c_birth_year              int
, c_birth_country           string
, c_login                   string
, c_email_address           string
, c_last_review_date        string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '${hiveconf:fieldDelimiter}'
STORED AS TEXTFILE LOCATION '${hiveconf:hdfsDataPath}/${hiveconf:customerTableName}'
;
-----------------
set hdfsDataPath=${env:BIG_BENCH_HDFS_ABSOLUTE_REFRESH_DATA_DIR};
外部临时表再将每个表的数据插入Hive数据库对应的表中
INSERT INTO TABLE ${hiveconf:customerTableName}
SELECT * FROM ${hiveconf:customerTableName}${hiveconf:temporaryTableSuffix}
;
22/2<12
《2023软件测试行业现状调查报告》独家发布~

关注51Testing

联系我们

快捷面板 站点地图 联系我们 广告服务 关于我们 站长统计 发展历程

法律顾问:上海兰迪律师事务所 项棋律师
版权所有 上海博为峰软件技术股份有限公司 Copyright©51testing.com 2003-2024
投诉及意见反馈:webmaster@51testing.com; 业务联系:service@51testing.com 021-64471599-8017

沪ICP备05003035号

沪公网安备 31010102002173号