run()
run() 方法是 RunBigBench.jar 里核心的方法。所有的执行都是通过 run() 方法调用的。比如 runQueries()、runModule()、generateData()等。runQueries()、runModule()、generateData() 又通过调用 runCmd() 方法来创建操作系统进程,执行bash命令,调用bash脚本。
run() 方法里通过一个 while 循环来逐一执行 workload 里的每一个 benchmarkPhase。 不同的 benchmarkPhase 会调用 runQueries()、runModule()、generateData()...中的不同方法。
try { long e = 0L; this.log.finer("Benchmark phases: " + this.benchmarkPhases); Iterator startCheckpoint = this.benchmarkPhases.iterator(); long throughputStart; while(startCheckpoint.hasNext()) { BigBench.BenchmarkPhase children = (BigBench.BenchmarkPhase)startCheckpoint.next(); if(children.isPhaseDone()) { this.log.info("The phase " + children.name() + " was already performed earlier. Skipping this phase"); } else { try { switch($SWITCH_TABLE$io$bigdatabenchmark$v1$driver$BigBench$BenchmarkPhase()[children.ordinal()]) { case 1: case 20: throw new IllegalArgumentException("The value " + children.name() + " is only used internally."); case 2: this.log.info(children.getConsoleMessage()); e = System.currentTimeMillis(); break; case 3: if(!BigBench.BenchmarkPhase.BENCHMARK_START.isPhaseDone()) { throw new IllegalArgumentException("Error: Cannot stop the benchmark before starting it"); } throughputStart = System.currentTimeMillis(); this.log.info(String.format("%-55s finished. Time: %25s", new Object[]{children.getConsoleMessage(), BigBench.Helper.formatTime(throughputStart - e)})); this.logTreeRoot.setCheckpoint(new BigBench.Checkpoint(BigBench.BenchmarkPhase.BENCHMARK, -1L, -1L, e, throughputStart, this.logTreeRoot.isSuccessful())); break; case 4: case 15: case 18: case 22: case 27: case 28: case 29: this.runModule(children, this.userArguments); break; case 5: case 10: case 11: this.runQueries(children, 1, validationArguments); break; case 6: case 9: this.runModule(children, validationArguments); break; case 7: this.generateData(children, false, validationArguments); break; case 8: this.generateData(children, true, validationArguments); break; case 12: case 19: case 24: this.runQueries(children, 1, this.userArguments); break; case 13: case 14: case 21: case 23: case 25: case 26: this.runQueries(children, this.numberOfParallelStreams, this.userArguments); break; case 16: this.generateData(children, false, this.userArguments); break; case 17: this.generateData(children, true, this.userArguments); } children.setPhaseDone(true); } catch (IOException var21) { this.log.info("==============\nBenchmark run terminated\nReason: An error occured while running a command in phase " + children + "\n=============="); var21.printStackTrace(); if(this.stopAfterFailure || children.mustSucceed()) { break; } } } } |
这里的 case 1-29 并不是 1-29 条查询,而是枚举类型里的 1-29 个 benmarkPhase 。如下所示:
private static enum BenchmarkPhase { BENCHMARK((String)null, "benchmark", false, false, false, false, "BigBench benchmark"), BENCHMARK_START((String)null, "benchmark_start", false, false, false, false, "BigBench benchmark: Start"), BENCHMARK_STOP((String)null, "benchmark_stop", false, false, false, false, "BigBench benchmark: Stop"), CLEAN_ALL("cleanAll", "clean_all", false, false, false, false, "BigBench clean all"), ENGINE_VALIDATION_CLEAN_POWER_TEST("cleanQuery", "engine_validation_power_test", false, false, false, false, "BigBench engine validation: Clean power test queries"), ENGINE_VALIDATION_CLEAN_LOAD_TEST("cleanMetastore", "engine_validation_metastore", false, false, false, false, "BigBench engine validation: Clean metastore"), ENGINE_VALIDATION_CLEAN_DATA("cleanData", "engine_validation_data", false, false, false, false, "BigBench engine validation: Clean data"), ENGINE_VALIDATION_DATA_GENERATION("dataGen", "engine_validation_data", false, false, false, true, "BigBench engine validation: Data generation"), ENGINE_VALIDATION_LOAD_TEST("populateMetastore", "engine_validation_metastore", false, false, false, true, "BigBench engine validation: Populate metastore"), ENGINE_VALIDATION_POWER_TEST("runQuery", "engine_validation_power_test", false, false, false, false, "BigBench engine validation: Power test"), ENGINE_VALIDATION_RESULT_VALIDATION("validateQuery", "engine_validation_power_test", false, false, true, false, "BigBench engine validation: Check all query results"), CLEAN_POWER_TEST("cleanQuery", "power_test", false, false, false, false, "BigBench clean: Clean power test queries"), CLEAN_THROUGHPUT_TEST_1("cleanQuery", "throughput_test_1", false, false, false, false, "BigBench clean: Clean first throughput test queries"), CLEAN_THROUGHPUT_TEST_2("cleanQuery", "throughput_test_2", false, false, false, false, "BigBench clean: Clean second throughput test queries"), CLEAN_LOAD_TEST("cleanMetastore", "metastore", false, false, false, false, "BigBench clean: Load test"), CLEAN_DATA("cleanData", "data", false, false, false, false, "BigBench clean: Data"), DATA_GENERATION("dataGen", "data", false, false, false, true, "BigBench preparation: Data generation"), LOAD_TEST("populateMetastore", "metastore", false, false, false, true, "BigBench phase 1: Load test"), POWER_TEST("runQuery", "power_test", false, true, false, false, "BigBench phase 2: Power test"), THROUGHPUT_TEST((String)null, "throughput_test", false, false, false, false, "BigBench phase 3: Throughput test"), THROUGHPUT_TEST_1("runQuery", "throughput_test_1", true, true, false, false, "BigBench phase 3: First throughput test run"), THROUGHPUT_TEST_REFRESH("refreshMetastore", "throughput_test_refresh", false, false, false, false, "BigBench phase 3: Throughput test data refresh"), THROUGHPUT_TEST_2("runQuery", "throughput_test_2", true, true, false, false, "BigBench phase 3: Second throughput test run"), VALIDATE_POWER_TEST("validateQuery", "power_test", false, false, true, false, "BigBench validation: Power test results"), VALIDATE_THROUGHPUT_TEST_1("validateQuery", "throughput_test_1", false, false, true, false, "BigBench validation: First throughput test results"), VALIDATE_THROUGHPUT_TEST_2("validateQuery", "throughput_test_2", false, false, true, false, "BigBench validation: Second throughput test results"), SHOW_TIMES("showTimes", "show_times", false, false, true, false, "BigBench: show query times"), SHOW_ERRORS("showErrors", "show_errors", false, false, true, false, "BigBench: show query errors"), SHOW_VALIDATION("showValidation", "show_validation", false, false, true, false, "BigBench: show query validation results"); private String runModule; private String namePattern; private boolean queryOrderRandom; private boolean queryOrderCached; private boolean printStdOut; private boolean mustSucceed; private String consoleMessage; private boolean phaseDone; private BenchmarkPhase(String runModule, String namePattern, boolean queryOrderRandom, boolean queryOrderCached, boolean printStdOut, boolean mustSucceed, String consoleMessage) { this.runModule = runModule; this.namePattern = namePattern; this.queryOrderRandom = queryOrderRandom; this.queryOrderCached = queryOrderCached; this.printStdOut = printStdOut; this.mustSucceed = mustSucceed; this.consoleMessage = consoleMessage; this.phaseDone = false; } |
3对应 BENCHMARK_STOP,4对应 CLEAN_ALL,29对应 SHOW_VALIDATION,依此类推...
可以看出:
CLEAN_ALL、CLEAN_LOAD_TEST、LOAD_TEST、THROUGHPUT_TEST_REFRESH、SHOW_TIMES、SHOW_ERRORS、SHOW_VALIDATION等benchmarkPhases调用的是
this.runModule(children, this.userArguments);
方法是 runModule ,参数是 this.userArguments。
ENGINE_VALIDATION_CLEAN_POWER_TEST、ENGINE_VALIDATION_POWER_TEST、ENGINE_VALIDATION_RESULT_VALIDATION 调用的是
this.runQueries(children, 1, validationArguments);
方法是 runQueries ,参数是 1(stream number) 和 validationArguments。
ENGINE_VALIDATION_CLEAN_LOAD_TEST 和 ENGINE_VALIDATION_LOAD_TEST 调用的是
this.runModule(children, validationArguments);
ENGINE_VALIDATION_CLEAN_DATA 调用的是
this.generateData(children, false, validationArguments);
ENGINE_VALIDATION_DATA_GENERATION 调用的是
this.generateData(children, true, validationArguments);
CLEAN_POWER_TEST、POWER_TEST、VALIDATE_POWER_TEST 调用的是
this.runQueries(children, 1, this.userArguments);
CLEAN_THROUGHPUT_TEST_1``CLEAN_THROUGHPUT_TEST_2``THROUGHPUT_TEST_1``THROUGHPUT_TEST_2``VALIDATE_THROUGHPUT_TEST_1 VALIDATE_THROUGHPUT_TEST_2 调用的是
this.runQueries(children, this.numberOfParallelStreams, this.userArguments);
CLEAN_DATA 调用的是
this.generateData(children, false, this.userArguments);
DATA_GENERATION 调用的是
this.generateData(children, true, this.userArguments);
总结一下以上的方法调用可以发现:
跟 ENGINE_VALIDATION 相关的benchmarkPhase用的参数都是 validationArguments。其余用的是 userArguments( validationArguments 和 userArguments 唯一的区别是 validationArguments 的 SCALE_FACTOR 恒为1)
跟 POWER_TEST 相关的都是调用 runQueries() 方法,因为 POWER_TEST 就是执行SQL查询
跟 CLEAN_DATA DATA_GENERATION 相关的都是调用 generateData() 方法
跟 LOAD_TEST SHOW 相关的都是调用 runModule() 方法
benchmarkPhase 和 module 对应关系
具体每个 benchmarkPhase 跟 module(执行的脚本)的对应关系如下:
CLEAN_ALL -> "cleanAll" ENGINE_VALIDATION_CLEAN_POWER_TEST -> "cleanQuery" ENGINE_VALIDATION_CLEAN_LOAD_TEST -> "cleanMetastore", ENGINE_VALIDATION_CLEAN_DATA -> "cleanData" ENGINE_VALIDATION_DATA_GENERATION -> "dataGen" ENGINE_VALIDATION_LOAD_TEST -> "populateMetastore" ENGINE_VALIDATION_POWER_TEST -> "runQuery" ENGINE_VALIDATION_RESULT_VALIDATION -> "validateQuery" CLEAN_POWER_TEST -> "cleanQuery" CLEAN_THROUGHPUT_TEST_1 -> "cleanQuery" CLEAN_THROUGHPUT_TEST_2 -> "cleanQuery" CLEAN_LOAD_TEST -> "cleanMetastore" CLEAN_DATA -> "cleanData" DATA_GENERATION -> "dataGen" LOAD_TEST -> "populateMetastore" POWER_TEST -> "runQuery" THROUGHPUT_TEST -> (String)null THROUGHPUT_TEST_1 -> "runQuery" THROUGHPUT_TEST_REFRESH -> "refreshMetastore" THROUGHPUT_TEST_2 -> "runQuery" VALIDATE_POWER_TEST -> "validateQuery" VALIDATE_THROUGHPUT_TEST_1 -> "validateQuery" VALIDATE_THROUGHPUT_TEST_2 -> "validateQuery" SHOW_TIMES -> "showTimes" SHOW_ERRORS -> "showErrors" SHOW_VALIDATION -> "showValidation" |
当执行某个 benchmarkPhase 时会去调用如上该 benchmarkPhase 对应的 module (脚本位于 $BENCH_MARK_HOME/engines/hive/bin 目录下)
cmdLine.add(benchmarkPhase.getRunModule());
程序调用流程
bigBench.png
接下来介绍每个module的功能
module
cleanAll
1. DROP DATABASE
2. 删除hdfs上的源数据
echo "dropping database (with all tables)"
runCmdWithErrorCheck runEngineCmd -e "DROP DATABASE IF EXISTS $BIG_BENCH_DATABASE CASCADE;"
echo "cleaning ${BIG_BENCH_HDFS_ABSOLUTE_HOME}"
hadoop fs -rm -r -f -skipTrash "${BIG_BENCH_HDFS_ABSOLUTE_HOME}"
cleanQuery
1. 删除对应的 Query 生成的临时表
2. 删除对应的 Query 生成的结果表
runCmdWithErrorCheck runEngineCmd -e "DROP TABLE IF EXISTS $TEMP_TABLE1; DROP TABLE IF EXISTS $TEMP_TABLE2; DROP TABLE IF EXISTS $RESULT_TABLE;"
return $?
cleanMetastore
1. 调用 `dropTables.sql` 将23张表依次DROP
echo "cleaning metastore tables"
runCmdWithErrorCheck runEngineCmd -f "$BIG_BENCH_CLEAN_METASTORE_FILE"
export BIG_BENCH_CLEAN_METASTORE_FILE="$BIG_BENCH_CLEAN_DIR/dropTables.sql"
dropTables.sql 将23张表依次DROP,源码如下:
DROP TABLE IF EXISTS ${hiveconf:customerTableName}; DROP TABLE IF EXISTS ${hiveconf:customerAddressTableName}; DROP TABLE IF EXISTS ${hiveconf:customerDemographicsTableName}; DROP TABLE IF EXISTS ${hiveconf:dateTableName}; DROP TABLE IF EXISTS ${hiveconf:householdDemographicsTableName}; DROP TABLE IF EXISTS ${hiveconf:incomeTableName}; DROP TABLE IF EXISTS ${hiveconf:itemTableName}; DROP TABLE IF EXISTS ${hiveconf:promotionTableName}; DROP TABLE IF EXISTS ${hiveconf:reasonTableName}; DROP TABLE IF EXISTS ${hiveconf:shipModeTableName}; DROP TABLE IF EXISTS ${hiveconf:storeTableName}; DROP TABLE IF EXISTS ${hiveconf:timeTableName}; DROP TABLE IF EXISTS ${hiveconf:warehouseTableName}; DROP TABLE IF EXISTS ${hiveconf:webSiteTableName}; DROP TABLE IF EXISTS ${hiveconf:webPageTableName}; DROP TABLE IF EXISTS ${hiveconf:inventoryTableName}; DROP TABLE IF EXISTS ${hiveconf:storeSalesTableName}; DROP TABLE IF EXISTS ${hiveconf:storeReturnsTableName}; DROP TABLE IF EXISTS ${hiveconf:webSalesTableName}; DROP TABLE IF EXISTS ${hiveconf:webReturnsTableName}; DROP TABLE IF EXISTS ${hiveconf:marketPricesTableName}; DROP TABLE IF EXISTS ${hiveconf:clickstreamsTableName}; DROP TABLE IF EXISTS ${hiveconf:reviewsTableName}; |
cleanData
1. 删除hdfs上 /user/root/benchmarks/bigbench/data 目录
2. 删除hdfs上 /user/root/benchmarks/bigbench/data_refresh 目录
echo "cleaning ${BIG_BENCH_HDFS_ABSOLUTE_INIT_DATA_DIR}" hadoop fs -rm -r -f -skipTrash "${BIG_BENCH_HDFS_ABSOLUTE_INIT_DATA_DIR}" echo "cleaning ${BIG_BENCH_HDFS_ABSOLUTE_REFRESH_DATA_DIR}" hadoop fs -rm -r -f -skipTrash "${BIG_BENCH_HDFS_ABSOLUTE_REFRESH_DATA_DIR}" |
dataGen
1. 创建目录 /user/root/benchmarks/bigbench/data 并赋予权限
2. 创建目录 /user/root/benchmarks/bigbench/data_refresh 并赋予权限
3. 调用 HadoopClusterExec.jar 和 pdgf.jar 生成 base data 到 /user/root/benchmarks/bigbench/data 目录下
4. 调用 HadoopClusterExec.jar 和 pdgf.jar 生成 refresh data 到 /user/root/benchmarks/bigbench/data_refresh 目录下
创建目录 /user/root/benchmarks/bigbench/data 并赋予权限
runCmdWithErrorCheck hadoop fs -mkdir -p "${BIG_BENCH_HDFS_ABSOLUTE_INIT_DATA_DIR}"
runCmdWithErrorCheck hadoop fs -chmod 777 "${BIG_BENCH_HDFS_ABSOLUTE_INIT_DATA_DIR}"
创建目录 /user/root/benchmarks/bigbench/data_refresh 并赋予权限
runCmdWithErrorCheck hadoop fs -mkdir -p "${BIG_BENCH_HDFS_ABSOLUTE_REFRESH_DATA_DIR}"
runCmdWithErrorCheck hadoop fs -chmod 777 "${BIG_BENCH_HDFS_ABSOLUTE_REFRESH_DATA_DIR}"
调用 HadoopClusterExec.jar 和 pdgf.jar 生成 base data
runCmdWithErrorCheck hadoop jar "${BIG_BENCH_TOOLS_DIR}/HadoopClusterExec.jar" -archives "${PDGF_ARCHIVE_PATH}" ${BIG_BENCH_DATAGEN_HADOOP_EXEC_DEBUG} -taskFailOnNonZeroReturnValue -execCWD "${PDGF_DISTRIBUTED_NODE_DIR}" ${HadoopClusterExecOptions} -exec ${BIG_BENCH_DATAGEN_HADOOP_JVM_ENV} -cp "${HADOOP_CP}:pdgf.jar" ${PDGF_CLUSTER_CONF} pdgf.Controller -nc HadoopClusterExec.tasks -nn HadoopClusterExec.taskNumber -ns -c -sp REFRESH_PHASE 0 -o "'${BIG_BENCH_HDFS_ABSOLUTE_INIT_DATA_DIR}/'+table.getName()+'/'" ${BIG_BENCH_DATAGEN_HADOOP_OPTIONS} -s ${BIG_BENCH_DATAGEN_TABLES} ${PDGF_OPTIONS} "$@" 2>&1 | tee -a "$BIG_BENCH_DATAGEN_STAGE_LOG" 2>&1 |
调用 HadoopClusterExec.jar 和 pdgf.jar 生成 refresh data
runCmdWithErrorCheck hadoop jar "${BIG_BENCH_TOOLS_DIR}/HadoopClusterExec.jar" -archives "${PDGF_ARCHIVE_PATH}" ${BIG_BENCH_DATAGEN_HADOOP_EXEC_DEBUG} -taskFailOnNonZeroReturnValue -execCWD "${PDGF_DISTRIBUTED_NODE_DIR}" ${HadoopClusterExecOptions} -exec ${BIG_BENCH_DATAGEN_HADOOP_JVM_ENV} -cp "${HADOOP_CP}:pdgf.jar" ${PDGF_CLUSTER_CONF} pdgf.Controller -nc HadoopClusterExec.tasks -nn HadoopClusterExec.taskNumber -ns -c -sp REFRESH_PHASE 1 -o "'${BIG_BENCH_HDFS_ABSOLUTE_REFRESH_DATA_DIR}/'+table.getName()+'/'" ${BIG_BENCH_DATAGEN_HADOOP_OPTIONS} -s ${BIG_BENCH_DATAGEN_TABLES} ${PDGF_OPTIONS} "$@" 2>&1 | tee -a "$BIG_BENCH_DATAGEN_STAGE_LOG" 2>&1 |
populateMetastore
该过程是真正的创建数据库表的过程。建表的过程调用的是 $BENCH_MARK_HOME/engines/hive/population/ 目录下的 hiveCreateLoad.sql ,通过该sql文件来建数据库表。
从 /user/root/benchmarks/bigbench/data 路径下读取 .dat 的原始数据,生成 TEXTFILE 格式的外部临时表
用 select * from 临时表 来创建最终的 ORC 格式的数据库表
删除外部临时表。
从 /user/root/benchmarks/bigbench/data 路径下读取 .dat 的原始数据,生成 TEXTFILE 格式的外部临时表
DROP TABLE IF EXISTS ${hiveconf:customerTableName}${hiveconf:temporaryTableSuffix}; CREATE EXTERNAL TABLE ${hiveconf:customerTableName}${hiveconf:temporaryTableSuffix} ( c_customer_sk bigint --not null , c_customer_id string --not null , c_current_cdemo_sk bigint , c_current_hdemo_sk bigint , c_current_addr_sk bigint , c_first_shipto_date_sk bigint , c_first_sales_date_sk bigint , c_salutation string , c_first_name string , c_last_name string , c_preferred_cust_flag string , c_birth_day int , c_birth_month int , c_birth_year int , c_birth_country string , c_login string , c_email_address string , c_last_review_date string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '${hiveconf:fieldDelimiter}' STORED AS TEXTFILE LOCATION '${hiveconf:hdfsDataPath}/${hiveconf:customerTableName}' ; |
用 select * from 临时表 来创建最终的 ORC 格式的数据库表
DROP TABLE IF EXISTS ${hiveconf:customerTableName}; CREATE TABLE ${hiveconf:customerTableName} STORED AS ${hiveconf:tableFormat} AS SELECT * FROM ${hiveconf:customerTableName}${hiveconf:temporaryTableSuffix} ; |
删除外部临时表
DROP TABLE ${hiveconf:customerTableName}${hiveconf:temporaryTableSuffix}; runQuery 1. runQuery 调用每个query下的 run.sh 里的 `query_run_main_method()` 方法 2. `query_run_main_method()` 调用 `runEngineCmd` 来执行query脚本(qxx.sql) runQuery 调用每个query下的 run.sh 里的 query_run_main_method() 方法 QUERY_MAIN_METHOD="query_run_main_method" ----------------------------------------- "$QUERY_MAIN_METHOD" 2>&1 | tee -a "$LOG_FILE_NAME" 2>&1 query_run_main_method() 调用 runEngineCmd 来执行query脚本(qxx.sql) query_run_main_method () { QUERY_SCRIPT="$QUERY_DIR/$QUERY_NAME.sql" if [ ! -r "$QUERY_SCRIPT" ] then echo "SQL file $QUERY_SCRIPT can not be read." exit 1 fi runCmdWithErrorCheck runEngineCmd -f "$QUERY_SCRIPT" return $? } |
一般情况下 query_run_main_method () 方法只是执行对应的query脚本,但是像 q05、q20... 这些查询,用到了机器学习算法,所以在执行对应的query脚本后会把生成的结果表作为输入,然后调用执行机器学习算法(如聚类、逻辑回归)的jar包继续执行,得到最终的结果。
runEngineCmd () { if addInitScriptsToParams then "$BINARY" "${BINARY_PARAMS[@]}" "${INIT_PARAMS[@]}" "$@" else return 1 fi } -------------------------- BINARY="/usr/bin/hive" BINARY_PARAMS+=(--hiveconf BENCHMARK_PHASE=$BIG_BENCH_BENCHMARK_PHASE --hiveconf STREAM_NUMBER=$BIG_BENCH_STREAM_NUMBER --hiveconf QUERY_NAME=$QUERY_NAME --hiveconf QUERY_DIR=$QUERY_DIR --hiveconf RESULT_TABLE=$RESULT_TABLE --hiveconf RESULT_DIR=$RESULT_DIR --hiveconf TEMP_TABLE=$TEMP_TABLE --hiveconf TEMP_DIR=$TEMP_DIR --hiveconf TABLE_PREFIX=$TABLE_PREFIX) INIT_PARAMS=(-i "$BIG_BENCH_QUERY_PARAMS_FILE" -i "$BIG_BENCH_ENGINE_SETTINGS_FILE") INIT_PARAMS+=(-i "$LOCAL_QUERY_ENGINE_SETTINGS_FILE") if [ -n "$USER_QUERY_PARAMS_FILE" ] then if [ -r "$USER_QUERY_PARAMS_FILE" ] then echo "User defined query parameter file found. Adding $USER_QUERY_PARAMS_FILE to hive init." INIT_PARAMS+=(-i "$USER_QUERY_PARAMS_FILE") else echo "User query parameter file $USER_QUERY_PARAMS_FILE can not be read." return 1 fi fi if [ -n "$USER_ENGINE_SETTINGS_FILE" ] then if [ -r "$USER_ENGINE_SETTINGS_FILE" ] then echo "User defined engine settings file found. Adding $USER_ENGINE_SETTINGS_FILE to hive init." INIT_PARAMS+=(-i "$USER_ENGINE_SETTINGS_FILE") else echo "User hive settings file $USER_ENGINE_SETTINGS_FILE can not be read." return 1 fi fi return 0 |
validateQuery
1. 调用每个query下的 run.sh 里的 `query_run_validate_method()` 方法
2. `query_run_validate_method()` 比较 `$BENCH_MARK_HOME/engines/hive/queries/qxx/results/qxx-result` 和hdfs上 `/user/root/benchmarks/bigbench/queryResults/qxx_hive_${BIG_BENCH_BENCHMARK_PHASE}_${BIG_BENCH_STREAM_NUMBER}_result` 两个文件,如果一样,则验证通过,否则验证失败。
if diff -q "$VALIDATION_RESULTS_FILENAME" <(hadoop fs -cat "$RESULT_DIR/*") then echo "Validation of $VALIDATION_RESULTS_FILENAME passed: Query returned correct results" else echo "Validation of $VALIDATION_RESULTS_FILENAME failed: Query returned incorrect results" VALIDATION_PASSED="0" fi |
SF为1时(-f 1),用上面的方法比较,SF不为1(>1)时,只要hdfs上的结果表中行数大于等于1即验证通过
if [ `hadoop fs -cat "$RESULT_DIR/*" | head -n 10 | wc -l` -ge 1 ] then echo "Validation passed: Query returned results" else echo "Validation failed: Query did not return results" return 1 fi refreshMetastore |
1. 调用 `$BENCH_MARK_HOME/engines/hive/refresh/` 目录下的 `hiveRefreshCreateLoad.sql` 脚本
2. `hiveRefreshCreateLoad.sql` 将hdfs上 `/user/root/benchmarks/bigbench/data_refresh/` 目录下每个表数据插入外部临时表
3. 外部临时表再将每个表的数据插入Hive数据库对应的表中
hiveRefreshCreateLoad.sql 将hdfs上 /user/root/benchmarks/bigbench/data_refresh/ 目录下每个表数据插入外部临时表 DROP TABLE IF EXISTS ${hiveconf:customerTableName}${hiveconf:temporaryTableSuffix}; CREATE EXTERNAL TABLE ${hiveconf:customerTableName}${hiveconf:temporaryTableSuffix} ( c_customer_sk bigint --not null , c_customer_id string --not null , c_current_cdemo_sk bigint , c_current_hdemo_sk bigint , c_current_addr_sk bigint , c_first_shipto_date_sk bigint , c_first_sales_date_sk bigint , c_salutation string , c_first_name string , c_last_name string , c_preferred_cust_flag string , c_birth_day int , c_birth_month int , c_birth_year int , c_birth_country string , c_login string , c_email_address string , c_last_review_date string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '${hiveconf:fieldDelimiter}' STORED AS TEXTFILE LOCATION '${hiveconf:hdfsDataPath}/${hiveconf:customerTableName}' ; ----------------- set hdfsDataPath=${env:BIG_BENCH_HDFS_ABSOLUTE_REFRESH_DATA_DIR}; 外部临时表再将每个表的数据插入Hive数据库对应的表中 INSERT INTO TABLE ${hiveconf:customerTableName} SELECT * FROM ${hiveconf:customerTableName}${hiveconf:temporaryTableSuffix} ; |