我们是这样做大数据测试

上一篇 / 下一篇  2021-01-07 15:42:07 / 个人分类:大数据

  先交代一下背景,上游各种业务系统数据进入到TIDB,再TIDB到HIVE。开发同事开发一个报表,需要写flow,一个flow会包含一个或者多个job。每个job就是执行sql,从HIVE表中查询数据,根据业务逻辑计算数据,将结果写入HIVE,最终生成报表。flow执行过程是由azkaban调度。
  我们大数据及报表的测试,分成数据链路,任务执行,报表逻辑三块。针对任务执行,我们做了一个工具来支撑测试。这个测试工具包括了手动部署任务,自动化部署,静态测试及任务失败原因分析。
  测试工具的示意图来一个。
  第一部分,更新环境名及分支信息。这是很多工具共用的。很多工具需要知道当前应该跑什么分支,在哪个环境执行。所有就做了一个公共的工具。在文件中配置了版本日期及对应的环境,比较当前日期和版本日期关系,可以算出应该跑什么分支的代码,在哪套环境运行。每天0点1分执行,将结果写入一个配置文件,如branch=20.12.12,env=test2,当天所有的需要用到这两个参数的都可以到这个配置文件来取。
  第二部分,更新代码库,先去第一部分获取当前的环境和版本信息,然后去更新代码库。
basepath=${cd `dirname $0`;pwd}
if [ ! $1 ];then
   echo 'miss argumet'
   exit 1
fi
cd ${basepath}'/code/reponame'
git clean -xdf
git reset --hard
git checkout 'branch'$1
if [ $? != 0 ];then
    git checkout master
    runbranch='master'
else
    runbranch='branch'${1}
fi
git pull
if [ $? == 0 ];then
    echo 'pull successfully'
else
    echo 'pull fail,exit'
    exit 1
fi
  更新代码库后做一些替换,因为我们生产一个库,测试环境多有个库,开发提测的代码是按照生产的库名。所以在测试环境执行时需要替换成对应的库名即需要环境名。
if [ ! $2 ];then
    echo 'miss argument'
    exit 1
elif [ $2 == 'full' ];then
    sourcedir='aa/bb/cc'
    targetdir='bb/cc/dd'
else
    sourcedir='AA/BB/BB/'$2
    if [ -d ${sourcedir} ];then
        target='BB/CC/DD/'$2
    else
        echo 'flow not exist'
        exit 1
    fi 
fi 
function getfilelist(){
    for file in `$1`
        do
        dorf=$1'/'${file}
        if [ -d ${dorf} ];then
           getfilelist ${dorf}
        else
            echo ${dorf}
        fi
        done
 filelist=`getfilelist ${sourcedir}`
 baddbname=${basepath}'/tmp/baddb.log'
 cd ${basepath}'/tmp'
 if [ -f ${baddbname} ];then
     rm -rf ${baddbname}
 fi
 touch ${baddbname}
 cd -
 
 for file in ${filelist}
     do
     baddb=`sed -n /_test1\丨_test2\丨_dev\./Ip ${file}`
     if [ -n "${baddb}" ];then
         echo ${file}  丨awk -F 'code' '{print $2}' >> ${baddbname}
         echo ${baddb} >> ${baddbname}
     fi
     sed -i "s/hivedb\./hivedb_${1}\./ig" ${file}
     done 
 
sshpass -p password rsync -rpt ${sourcedir} user@host:${targetdir}
if [ $? == 0 ];then
    echo 'delopy successfully'
else
    echo 'delopy fail'
    exit 1
fi
  到这里,我们更新了最新的代码库,静态扫描是不是有库名写错的,然后替换成测试环境的库名。然后将文件部署到目前服务器,就是azkaban的task目录。库名如果有问题写入日志中。
  第三部分了,我们就打包azkaban的job,在打包前先把要替换的进行替换这里的$1是指环境名如test1。
sourcedir=${basepath}'/code/azkaban/'
tmpdir=${basepath}/'tmp/'
envpath=${basepath}'/code/azkaban/env.properties'
sed -i "s/mammoth\.sh/mammoth${1}\.sh/ig" ${envpath}
cd ${sourcedir}
zipdir=${tmpdir}'/azkaban.zip'
if [ -f ${zipdir} ];then
    rm -rf ${zipdir}
fi
zip -r ${zipdir} azkaban
cd -
  需要把azkaban的包部署到服务器,azkaban提供了接口上传。记录上传的结果到日志文件。
def uploadzip(filepath):
     files = {'file':('azkaban.zip',open(filepath,'rb'),'application/zip')}
     uploaddata = {'project':'project','file':files,'ajaz':'upload'}
     try:
         r1 = requests.post(managerul,data=uploaddata,file=files).json()
         if 'projectId' in r1.keys():
             result = 'upload successfully'
         else
             result = 'upload fail'
      except:
             result = 'upload fail'
     return result
  第四部分,配置定时任务,同样azkaban有接口可以调用。定时任务脚本的MD5值检查一下,如果有变化就执行,没有就不用执行了。
basepath=${cd `dirname $0`;pwd}
parpath=$(dirname ${basepath})
source ${parpath}'/branch.config'
echo ${branch}
fullbranch='20'${branch//\./}
oldmdfile=${basepath}'/tmp/md5.config'
arrary=(${1//,/ })
for repo in ${arrary[@]}
    do
    schedulefile=${basepath}'/code/'${repo}'/schedule/'${fullbranch}'/schedule.txt'
    if [[  -s ${schedulefile} ]];then
        newmd5=`md5sum ${schedulefile} 丨 awk '{print $1}'`
        oldmd5=`sed '/^${repo}='/!d;s/.*=//` ${oldmdfile}
        sed -i '/'${repo}'/d' ${oldmdfile}
        echo ${repo}'='${newmd5} >> ${oldmdfile}
        if [ "${newmd5}" != "${oldmd5}" ];then
            python3 schedule.py ${repo} ${schedulefile}
        fi
    fi
done
  如果schedule文件有更新,就调用接口,没有就不更新。将更新结果写入日志中。
reponame = sys.argv[1]
schedule_fule = sys.argc[2]
def configschedule():
    url = 'host:8000/schedule'
    data = {'projectName':project,'flow':flow,'cronExpression':'cron','ajax':'scheduleCronFlow'}
    r = reuqests.post(url,data)
  在任务执行前,我们还有一个静态测试,就是任务会检查依赖的表是否都完成了同步。如果没有完成同步,任务执行不下去。所以我们在任务执行前先检查所有的表是不是都完成merge,如果没有的,插入一个成功标识,保证job可以运行。并且将没有merge完成的表写入日志。
def getmergetable(env):
    fulltablesql = '''select task_name from table1 where a='bb' '''
    passtablesql = '''select tablename from table2 where status='PASS' '''
    failtable = []
    fulltable = exelistsql(env,fulltablesql)
    successtable = exelistsql(env,passtablesql)
    conn = connectdb(env)
    cursor = conn.cursor()
    for table in fulltable:
        if table not in successtable:
            failtable.append(table)
            sql = '''insert into table2(....) VALUES (.....%s)'''%(table)
            cursor.execute(sql)
    conn.ommit()
    conn.close()
  以上这些更新,都是通过定时任务来完成,更新完这些之后,我们就完成了全部的部署,包括job,task,schedule。每天0点之后,第一个job执行之前完成这些操作。中间时间段就是job自己运行了。
  早上8点20,正常情况任务基本都已经运行完成,开始分析结果。
  先去查询失败的job。将失败job的exec_id复制到azkaban服务器。
def executefaillist():
    failjob = querydb()
    failexclist = []
    for job in failjob:
        failexclist.append(job['exec_id'])
    failliststr = "execids='" + ' '.join(failexclist) + "'"
    wirtedatatofile(failliststr)
    execmd = 'sh ' + currentpath + '/sendfaillist.sh'
    status,output = execute_cmd(execmd)
  失败的exec_id复制到目标服务器之后,执行log解析。
#!/user/bin/expect

set user [lindex $argv 0]
set ip [lindex $argv 1]
set password [lindex $argv 2]

spawn ssh $user@$ip
set timeout 120
expect{
"yes/no"{send "yes\r"}
"password" {send "$password\r"}
}
send "path/script.sh&\r"
send "exit\r"
expect eof
  单个执行
mainpath='path/azakban/executions/'${1}
resultfile='result/result.txt'
jobinfo=`ls ${mainpath}/_flow.${1}.log`
if [[ ${jobinfo} ]];then
    workdir=`cat ${jobinfo} | grep 'working.dir' |awk -F 'working.dir' '{print $2}' |awk -F ',' '{print $1}' |sort -u`
  for logfile in ${workdir}
  do
     loglist = `ls ${logfile}/*.log`
     for filepath in ${loglist}
         failreason=`cat ${filepath} | grep 'ActiongException|AnalysisException|SparkException|JsonParseException' -m1 -A1`
     if [[ ${failreason} ]];then
         echo $1'|'${filepath##*/}'|'${failreason} >>${resultfile}
     fi
  done
  执行所有的
mainpath='path/azakban/executions/'${1}
resultfile='result/result.txt'
jobinfo=`ls ${mainpath}/_flow.${1}.log`
if [[ ${jobinfo} ]];then
    workdir=`cat ${jobinfo} | grep 'working.dir' |awk -F 'working.dir' '{print $2}' |awk -F ',' '{print $1}' |sort -u`
  for logfile in ${workdir}
  do
     loglist = `ls ${logfile}/*.log`
     for filepath in ${loglist}
         failreason=`cat ${filepath} | grep 'ActiongException|AnalysisException|SparkException|JsonParseException' -m1 -A1`
     if [[ ${failreason} ]];then
         echo $1'|'${filepath##*/}'|'${failreason} >>${resultfile}
     fi
  done
  azkaban也提供了日志查询接口,也可以通过接口去查询日志后解析返回值。但是效率是差很多的。以上方法,几百个文件,1分钟以内可以搞定。
  8:28,最后一个定时任务,查询所有任务的运行结果,从远程下载失败日志,和前面一样用sshpass就可以了。然后就是查询前面静态测试和各个环节的日志。拼装成一个html文件。以表格的形式直接展示在邮件正文中。哪个环境有哪些问题,失败的日志原因是什么。
mailconent1 = '''<!DOCTYPE html> <html><head>...</head><body>'''
mailconent2 = '''</body></html>'''
mailtable = '''<table....><thead>
 <tr....>
 <td ...>project</td>
 <td....>exec_id</td>
 <td....>flow_id</td>
 <td....>status</td>
 <td....>start_time</td>
 <td....>end_endtime</td>
 <td....>takes_time</td>
 <td....>reason</td>
 '''
  上班前,我们就可以收到结果邮件,早上上班就可以看到所有任务执行情况了。通过一段时间运行,发现了一些潜在风险问题,测试环境提前发现,避免了生产出现问题。也发现了不少开发同事提交代码没有自测,直接失败的。
  再补充一下页面部署工具,在白天大家的测试过程中,会需要部署最新的代码,所以我们提供了页面给到大家用。

def delopy(projectid,branch,env,flow):
    output = ''
    updatecmd = 'sh updaterepo.sh ' + branch
    replacecmd = ' sh replace.sh ' + env  + ' ' + flow
    status1,output1 = execute_cmd(updatecmd)
    status2,output2 = execute_cmd(replacecmd)
    output = output1 + output2
    if status1 == 0 and status2 == 0:
        delopyresult = 'PASS'
    else:
        delopyresult = 'FAIL'
    return delopyresult,output

TAG: 软件测试技术 大数据测试

 

评分:0

我来说两句

Open Toolbar