四 使用实例
基于Apache Griffin Hive数据库源数据计算
本地化举例演示:
(1)访问Apache Griffin可视化界面数据
(2)设置指标模型界面
(3)配置源数据和目标数据,以及对应的指标模型结果数据
(4)按照步骤,配置引擎结果
(5),配置任务的执行Job
(6),点击保存
设置Job任务:
job任务配置页面设置
基于Apache Griffin Kafka源数据计算
http://griffin.apache.org/docs/usecases.html (待分析,这个我们不会使用流数据源处理)
五 源码分析
https://github.com/apache/griffin 基于griffin- 0.4.0-rc0版本
个人github:https://github.com/zcswl7961/apache-griffin-expand
源码模块:
service:spring boot代码,做web配置 和监控界面服务端数据
measure: scala 代码,Spark定时任务代码
ui:前端界面
数据依赖配置模块:
application.properties
env:流和批处理
1,任务调度源码
(1)首先是由前端进行作业任务保存之后,调用JobController的 POST /jobs方法,判断对应的是为批处理还是流处理作业任务,如果为批处理,创建BatchJob数据,然后保存本地的quartz的job。
同时执行jobService.addJob(triggerKey, batchJob, BATCH);方法,创建定时任务,执行SparkSubmitJob job作业。
initParam(jd);初始化相关参数信息,包括从JobDetail中获取measure,jobInstance,获取livy.url的配置信息,
setLivyConf(); 设置livy任务提交的相关参数赋值给livyConfMap实例,主要是对于sparkProperties.json文件的解析,同时追加了一个raw参数。
saveJobInstance(jd);通过livy提交spark任务,同时将当前的任务执行历史存入到本地quartz库中,进入到saveJobInstance方法中,首先执行post2Livy方法 ,首先设置了livy任务提交的HttpEntity。
最终的livy任务提交中,调用的接口是 url:http://192.168.239.171:8998/batches ,body信息为:
{
"file":"hdfs://localhost:8020/griffin/griffin-measure.jar",
"className":"org.apache.griffin.measure.Application",
"name":"griffin",
"queue":"default",
"numExecutors":2,
"executorCores":1,
"driverMemory":"1g",
"executorMemory":"1g",
"conf":{
"spark.yarn.dist.files":"hdfs://localhost:8020/home/spark_conf/hive-site.xml"
},
"files":[
],
"args":[
"{
"spark" : {
"log.level" : "WARN"
},
"sinks" : [ {
"type" : "CONSOLE",
"config" : {
"max.log.lines" : 10
}
}, {
"type" : "HDFS",
"config" : {
"path" : "hdfs://localhost:8020/griffin/persist",
"max.persist.lines" : 10000,
"max.lines.per.file" : 10000
}
}, {
"type" : "ELASTICSEARCH",
"config" : {
"method" : "post",
"api" : "http://localhost:9200/griffin/accuracy",
"connection.timeout" : "1m",
"retry" : 10
}
} ],
"griffin.checkpoint" : [ ]
}",
"{
"measure.type" : "griffin",
"id" : 3355,
"name" : "schedule-job-zcg",
"owner" : "test",
"description" : "test",
"deleted" : false,
"timestamp" : 1569477360000,
"dq.type" : "ACCURACY",
"sinks" : [ "ELASTICSEARCH", "HDFS" ],
"process.type" : "BATCH",
"data.sources" : [ {
"id" : 3358,
"name" : "source",
"connectors" : [ {
"id" : 3359,
"name" : "source1569548839003",
"type" : "HIVE",
"version" : "1.2",
"predicates" : [ ],
"data.unit" : "1day",
"data.time.zone" : "",
"config" : {
"database" : "griffin_demo",
"table.name" : "demo_src",
"where" : "dt='20190927' AND hour = '09'"
}
} ],
"baseline" : false
}, {
"id" : 3360,
"name" : "target",
"connectors" : [ {
"id" : 3361,
"name" : "target1569548846717",
"type" : "HIVE",
"version" : "1.2",
"predicates" : [ ],
"data.unit" : "1day",
"data.time.zone" : "",
"config" : {
"database" : "griffin_demo",
"table.name" : "demo_tgt",
"where" : "dt='20190927' AND hour = '09'"
}
} ],
"baseline" : false
} ],
"evaluate.rule" : {
"id" : 3356,
"rules" : [ {
"id" : 3357,
"rule" : "source.id=target.id",
"dsl.type" : "griffin-dsl",
"dq.type" : "ACCURACY",
"out.dataframe.name" : "accuracy"
} ]
},
"measure.type" : "griffin"
}",
"raw,raw"
]
}
本文内容不用于商业目的,如涉及知识产权问题,请权利人联系51Testing小编(021-64471599-8017),我们将立即处理