Windows主机漏洞扫描工具

发表于:2020-6-04 10:31

字体: | 上一篇 | 下一篇 | 我要投稿

 作者:jerrybird    来源:freebuf

  0×00 说明:
  这是一款基于主机的漏洞扫描工具,采用多线程确保可以快速的请求数据,采用线程锁可以在向sqlite数据库中写入数据避免database is locked的错误,采用md5哈希算法确保数据不重复插入。
  本工具查找是否有公开exp的网站为shodan,该网站限制网络发包的速度,因而采用了单线程的方式,且耗时较长。
  功能:
  查找主机上具有的CVE
  查找具有公开EXP的CVE
  0×01 起因:
  因为需要做一些主机漏洞扫描方面的工作,因而编写了这个简单的工具。之前也查找了几款类似的工具,如下:
  vulmap:
  vulmon开发的一款开源工具,原理是根据软件的名称和版本号来确定,是否有CVE及公开的EXP。这款Linux的工具挺好用,但是对于Windows系统层面不太适用。
  windows-exp-suggester:
  这款和本工具的原理一样,尝试使用了之后,发现它的CVEKB数据库只更新到2017年的,并且没有给出CVE是否有公开的EXP信息。
  基于以上所以写了这个简单的工具,该项目在https://github.com/chroblert/WindowsVulnScan
  0×02 原理:
  1. 搜集CVE与KB的对应关系。首先在微软官网上收集CVE与KB对应的关系,然后存储进数据库
  2. 查找特定CVE网上是否有公开的EXP
  3. 利用powershell脚本收集主机的一些系统版本与KB信息
  4. 利用系统版本与KB信息搜寻主机上具有存在公开EXP的CVE
  0×03 参数:
   # author: JC0o0l
  # GitHub: https://github.com/chroblert/
  可选参数:
  -h, --help            show this help message and exit
  -u, --update-cve      更新CVEKB数据
  -U, --update-exp      更新CVEEXP数据
  -C, --check-EXP       检索具有EXP的CVE
  -f FILE, --file FILE  ps1脚本运行后产生的.json文件
  0×04 示例:
  1. 首先运行powershell脚本KBCollect.ps收集一些信息
 .\KBCollect.ps1
  2. 将运行后产生的KB.json文件移动到cve-check.py所在的目录
  3. 安装一些python3模块
 python3 -m pip install requirements.txt
  4. 运行cve-check.py -u创建CVEKB数据库
  5. 运行cve-check.py -U更新CVEKB数据库中的hasPOC字段
  6. 运行cve-check.py -C -f KB.json查看具有公开EXP的CVE,如下:
  0×05 源码:
  KBCollect.ps1:
   function Get-CollectKB(){
  # 1. 搜集所有的KB补丁
  $KBArray = @()
  $KBArray = Get-HotFix|ForEach-Object {$_.HotFixId}
  $test = $KBArray|ConvertTo-Json
  return $test
  }
  function Get-BasicInfo(){
  # 1. 操作系统
  # $windowsProductName = (Get-ComputerInfo).WindowsProductName
  $windowsProductName = (Get-CimInstance Win32_OperatingSystem).Caption
  # 2. 操作系统版本
  $windowsVersion = (Get-ComputerInfo).WindowsVersion
  $basicInfo = "{""windowsProductName"":""$windowsProductName"",""windowsVersion"":""$windowsVersion""}"
  return $basicInfo
  }
  $basicInfo = Get-BasicInfo
  $KBList = Get-CollectKB
  $KBResult = "{""basicInfo"":$basicInfo,""KBList"":$KBList}"
  $KBResult|Out-File KB.json -encoding utf8
  cve-check.py:
import requests
  import sqlite3
  import json
  import hashlib
  import math
  import re
  import threading
  import time
  import argparse
  from pathlib import Path
  # 删除一些ssl 认证的warnging信息
  requests.packages.urllib3.disable_warnings()
  ?
  ThreadCount=20
  DBFileName="CVEKB.db"
  TableName="CVEKB"
  insertSQL = []
  updateSQL = []
  lock = threading.Lock()
  KBResult = {}
  parser = argparse.ArgumentParser()
  parser.add_argument("-u","--update-cve",help="更新CVEKB数据",action="store_true")
  parser.add_argument("-U","--update-exp",help="更新CVEEXP数据",action="store_true")
  parser.add_argument("-C","--check-EXP",help="检索具有EXP的CVE",action="store_true")
  parser.add_argument("-f","--file",help="ps1脚本运行后产生的.json文件")
  args = parser.parse_args()
  ?
  class CVEScanThread(threading.Thread):
  def __init__(self,func,args,name="",):
  threading.Thread.__init__(self)
  self.func = func
  self.args = args
  self.name = name
  self.result = None
  ?
  def run(self):
  print("thread:{} :start scan page {}".format(self.args[1],self.args[0]))
  self.result = self.func(self.args[0],)
  print("thread:{} :stop scan page {}".format(self.args[1],self.args[0]))
  class EXPScanThread(threading.Thread):
  def __init__(self,func,args,name="",):
  threading.Thread.__init__(self)
  self.func = func
  self.args = args
  self.name = name
  self.result = None
  ?
  def run(self):
  print("thread:{} :start scan CVE: {},xuehao:{}".format(self.args[1],self.args[0],self.args[2]))
  self.result = self.func(self.args[0],)
  print("thread:{} :stop scan CVE: {}".format(self.args[1],self.args[0]))
  def get_result(self):
  threading.Thread.join(self)
  try:
  return self.result
  except Exception:
  return "Error"
  def get_page_num(num=1,pageSize=100):
  url = "https://portal.msrc.microsoft.com/api/security-guidance/en-us"
  payload = "{\"familyIds\":[],\"productIds\":[],\"severityIds\":[],\"impactIds\":[],\"pageNumber\":" + str(num) + ",\"pageSize\":" + str(pageSize) + ",\"includeCveNumber\":true,\"includeSeverity\":false,\"includeImpact\":false,\"orderBy\":\"publishedDate\",\"orderByMonthly\":\"releaseDate\",\"isDescending\":true,\"isDescendingMonthly\":true,\"queryText\":\"\",\"isSearch\":false,\"filterText\":\"\",\"fromPublishedDate\":\"01/01/1998\",\"toPublishedDate\":\"03/02/2020\"}"
  headers = {
  'origin': "https//portal.msrc.microsoft.com",
  'referer': "https//portal.msrc.microsoft.com/en-us/security-guidance",
  'accept-language': "zh-CN",
  'user-agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36 Edge/16.16299",
  'accept': "application/json, text/plain, */*",
  'accept-encoding': "gzip, deflate",
  'host': "portal.msrc.microsoft.com",
  'connection': "close",
  'cache-control': "no-cache",
  'content-type': "application/json",
  }
  ?
  response = requests.request("POST", url, data=payload, headers=headers, verify = False)
  resultCount = json.loads(response.text)['count']
  return math.ceil(int(resultCount)/100)
  ?
  def update_cvekb_database(num=1,pageSize=100):
  pageCount = get_page_num()
  #for i in range(1,pageCount+1):
  i = 1
  pageCount=524
  tmpCount = ThreadCount
  while i <= pageCount:
  tmpCount = ThreadCount if (pageCount - i) >= ThreadCount else pageCount - i
  print("i:{},pageCount-i:{},ThreadCount:{},PageCount:{}".format(i,pageCount-i,ThreadCount,pageCount))
  time.sleep(0.5)
  threads = []
  print("===============================")
  for j in range(1,tmpCount + 1):
  print("更新第{}页".format(i+j-1))
  t = CVEScanThread(update_onepage_cvedb_database,(i+j-1,j,),str(j))
  threads.append(t)
  for t in threads:
  t.start()
  for t in threads:
  t.join()
  # update_onepage_cvedb_database(num=i)
  i = i + tmpCount
  conn = sqlite3.connect(DBFileName)
  for sql in insertSQL:
  conn.execute(sql)
  conn.commit()
  conn.close()
  if tmpCount != ThreadCount:
  break
  def check_POC_every_CVE(CVEName=""):
  #apiKey = ""
  #url = "https://exploits.shodan.io/api/search?query=" + CVEName + "&key=" + apiKey
  url = "https://exploits.shodan.io/?q=" + CVEName
  try:
  response = requests.request("GET",url=url,verify=False,timeout=10)
  #total = json.loads(response.text)
  except Exception as e:
  print("Error,{}".format(CVEName))
  print(e)
  return "Error"
  if "Total Results" not in response.text:
  return "False"
  else:
  return "True"
  def update_onepage_cvedb_database(num=1,pageSize=100):
  url = "https://portal.msrc.microsoft.com/api/security-guidance/en-us"
  ?
  payload = "{\"familyIds\":[],\"productIds\":[],\"severityIds\":[],\"impactIds\":[],\"pageNumber\":" + str(num) + ",\"pageSize\":" + str(pageSize) + ",\"includeCveNumber\":true,\"includeSeverity\":false,\"includeImpact\":false,\"orderBy\":\"publishedDate\",\"orderByMonthly\":\"releaseDate\",\"isDescending\":true,\"isDescendingMonthly\":true,\"queryText\":\"\",\"isSearch\":false,\"filterText\":\"\",\"fromPublishedDate\":\"01/01/1998\",\"toPublishedDate\":\"03/02/2020\"}"
  headers = {
  'origin': "https//portal.msrc.microsoft.com",
  'referer': "https//portal.msrc.microsoft.com/en-us/security-guidance",
  'accept-language': "zh-CN",
  'user-agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36 Edge/16.16299",
  'accept': "application/json, text/plain, */*",
  'accept-encoding': "gzip, deflate",
  'host': "portal.msrc.microsoft.com",
  'connection': "close",
  'cache-control': "no-cache",
  'content-type': "application/json",
  }
  try:
  response = requests.request("POST", url, data=payload, headers=headers, verify = False)
  resultList = json.loads(response.text)['details']
  except :
  print(response.text)
  conn = sqlite3.connect(DBFileName)
  create_sql = """Create Table IF NOT EXISTS {} (
  hash TEXT UNIQUE,
  name TEXT,
  KBName TEXT,
  CVEName TEXT,
  impact TEXT,
  hasPOC TEXT)""".format(TableName)
  conn.execute(create_sql)
  conn.commit()
  conn.close()
  for result in resultList:
  KBName = result['articleTitle1'] + ";" if (result['articleTitle1'] !=  None) and result['articleTitle1'].isdigit() else ""
  KBName += result['articleTitle2'] + ";" if (result['articleTitle2'] != None) and result['articleTitle2'].isdigit() else ""
  KBName += result['articleTitle3'] + ";" if (result['articleTitle3'] != None) and result['articleTitle3'].isdigit() else ""
  KBName += result['articleTitle4'] + ";" if (result['articleTitle4'] != None) and result['articleTitle4'].isdigit() else ""
  if KBName == "":
  continue
  h1 = hashlib.md5()
  metaStr = result['name'] + KBName + result['cveNumber'] + result['impact']
  h1.update(metaStr.encode('utf-8'))
  #hasPOC = check_POC_every_CVE(result['cveNumber'])
  # 收集到所有的KB后再搜索有没有公开的EXP
  hasPOC = ""
  sql = "INSERT OR IGNORE INTO "+TableName+" VALUES ('" + h1.hexdigest() + "','" + result['name'] + "','" + KBName + "','" + result['cveNumber'] + "','" + result['impact'] + "','" + hasPOC+"')"
  with lock:
  global insertSQL
  insertSQL.append(sql)
  # conn.execute(sql)
  # conn.commit()
  # conn.close()
  # pass
  ?
  def select_CVE(tmpList=[],windowsProductName="",windowsVersion=""):
  conn = sqlite3.connect(DBFileName)
  con = conn.cursor()
  intersectionList = []
  count = 0
  for i in tmpList:
  sql = 'select distinct(CVEName) from '+ TableName+' where (name like "'+ windowsProductName+'%'+ windowsVersion + '%") and ("'+ i +'" not in (select KBName from '+ TableName +' where name like "'+ windowsProductName+'%'+windowsVersion+'%")); '
  cveList = []
  for cve in con.execute(sql):
  cveList.append(cve[0])
  if count == 0:
  intersectionList = cveList.copy()
  count +=1
  intersectionList = list(set(intersectionList).intersection(set(cveList)))
  intersectionList.sort()
  for cve in intersectionList:
  sql = "select CVEName from {} where CVEName == '{}' and hasPOC == 'True'".format(TableName,cve)
  # print(sql)
  con.execute(sql)
  if len(con.fetchall()) != 0:
  print("{} has EXP".format(cve))
  # print(intersectionList)
  def update_hasPOC(key = "Empty"):
  conn = sqlite3.connect(DBFileName)
  con = conn.cursor()
  if key == "All":
  sql = "select distinct(CVEName) from {}".format(TableName)
  else:
  sql = "select distinct(CVEName) from {} where (hasPOC IS NULL) OR (hasPOC == '')".format(TableName)
  ?
  con.execute(sql)
  cveNameList = con.fetchall()
  i = 0
  count = 1
  while i < len(cveNameList):
  print("|=========={}============|".format(i))
  # tmpCount = ThreadCount if (len(cveNameList) - i) >= ThreadCount else len(cveNameList) - i
  # threads = []
  # for j in range(1,tmpCount+1):
  #     t = EXPScanThread(check_POC_every_CVE,(cveNameList[i+j][0],j,i+j,),str(j))
  #     threads.append(t)
  # for t in threads:
  #     t.start()
  # for t in threads:
  #     t.join()
  # j = 1
  # for t in threads:
  #     hasPOC = t.get_result()
  #     print(hasPOC)
  #     update_sql = "UPDATE "+TableName+" set hasPOC = '" + hasPOC + "' WHERE cveName == '" + cveNameList[i+j][0] +"';"
  #     conn.execute(update_sql)
  #     print("[+] update:{}".format(update_sql))
  #     j += 1
  # i=i+ThreadCount
  # conn.commit()
  hasPOC = check_POC_every_CVE(cveNameList[i][0])
  time.sleep(0.3)
  update_sql = "UPDATE "+TableName+" set hasPOC = '" + hasPOC + "' WHERE cveName == '" + cveNameList[i][0] +"';"
  conn.execute(update_sql)
  print(update_sql)
  count += 1
  i += 1
  if count == 10:
  conn.commit()
  print("[+]update")
  count = 1
  conn.commit()
  conn.close()
  print("Over")
  ?
  ?
  if __name__ == "__main__":
  banner = """
  ========CVE-EXP-Check===============
  |       author:JC0o0l               |
  |       wechat:JC_SecNotes          |
  |       version:1.0                 |
  =====================================
  """
  print(banner)
  if (not args.check_EXP ) and (not args.update_cve) and (not args.update_exp) and args.file is None:
  parser.print_help()
  if args.update_cve:
  update_cvekb_database()
  if args.update_exp:
  dbfile=Path(DBFileName)
  if dbfile.exists():
  update_hasPOC(key="Empty")
  else:
  print("请先使用-u 创建数据库")
  parser.print_help()
  if args.check_EXP:
  dbfile=Path(DBFileName)
  if not dbfile.exists():
  print("请先使用-u 创建数据库,之后使用 -U 更新是否有EXP")
  parser.print_help()
  exit()
  if args.file:
  with open(args.file,"r",encoding="utf-8") as f:
  KBResult = json.load(f)
  windowsProductName = KBResult['basicInfo']['windowsProductName']
  windowsProductName = ((re.search("\w[\w|\s]+\d+[\s|$]",windowsProductName).group()).strip()).replace("Microsoft","").strip()
  windowsVersion = KBResult['basicInfo']['windowsVersion']
  print("系统信息如下:")
  print("{} {}".format(windowsProductName,windowsVersion))
  tmpKBList = KBResult['KBList']
  KBList = []
  for KB in tmpKBList:
  KBList.append(KB.replace("KB",""))
  print("KB信息如下:")
  print(KBList)
  print("EXP信息如下:")
  select_CVE(tmpList=KBList,windowsProductName=windowsProductName,windowsVersion=windowsVersion)
  else:
  print("请输入.json文件")
  

     本文内容不用于商业目的,如涉及知识产权问题,请权利人联系博为峰小编(021-64471599-8017),我们将立即处理
《2023软件测试行业现状调查报告》独家发布~

关注51Testing

联系我们

快捷面板 站点地图 联系我们 广告服务 关于我们 站长统计 发展历程

法律顾问:上海兰迪律师事务所 项棋律师
版权所有 上海博为峰软件技术股份有限公司 Copyright©51testing.com 2003-2024
投诉及意见反馈:webmaster@51testing.com; 业务联系:service@51testing.com 021-64471599-8017

沪ICP备05003035号

沪公网安备 31010102002173号