关于bugscan的屁事

发布时间:March 14, 2017 // 分类:开发笔记,linux,python,windows // No Comments

** 关于bugscan的使用 **
从14年接触python,15年开始写bugscan插件到现在bugscan的改版.早起的爬虫,后来逆向客户端.主要的是为了它那个开源的插件,还有一个原因就是它的插件太方便了.它的生成有模板可用的.

早期写的教程 https://my.oschina.net/rookier/blog/393074

** 逆向的辛酸苦辣 **
最早我还会c++的时候写过一次爬虫
https://my.oschina.net/rookier/blog/395712
后来一直学习python又重新写了一次
http://0cx.cc/python_spider_bugscan.jspx
再后来发现部分插件是没法用的.于是又果断的逆向了客户端
http://0cx.cc/uncompyle_loads.jspx

** 关于插件的调用 **
这个最早的时候是批量调用
http://0cx.cc/bug_scan_vul.jspx
后来是逆向出插件一段时候后识别后调用
http://0cx.cc/which_cms_online.jspx
期间各种扫描分析
http://0cx.cc/bugscan_run_exec.jspx

** 继续插件调用的爱恨情愁 **

python -c "exec(__import__('urllib2').urlopen('http://t.cn/xxxxx').read())" -m 5

这个url会经过短地址还原,带上自己的ID向服务器发起rpcs认证请求。详情参照http://0cx.cc/uncompyle_loads.jspx

if Debug_X(debugkey_str, 'main'):
        print 'tid=', thread.start_new_thread(Mysql_insert_QueueInfo, ())

    if '_S' not in globals():
        _S = 'https'
    if '_U' not in globals():
        _U = 'your ID just like 0000000000000'
    if '_B' not in globals():
        _B = 'old.bugscan.net'

    Urls = '%s://%s/rpcs' % (_S, _B)
    Login_Get = Loginx(_U, Urls)
    VER_INT = 1.95
    UPdate_yes_no = False
    Plugin_Code = {}

追查下Loginx

class Loginx(object):
    def __init__(self, uhash, serviceURL, serviceName=None):
        self.__serviceURL = serviceURL
        self.__serviceName = serviceName
        self.__uhash = uhash

    def __call__(self, *args):

        PC_uuid = str(uuid.uuid1())
        Login_Info_json = json.dumps({'method': self.__serviceName,
                          'params': args,
                          'uid': self.__uhash,
                          'uuid': PC_uuid})

        Mysql_table_insert('rpclog', method=self.__serviceName, params=repr(args), uid=self.__uhash, uuid=PC_uuid)

        Login_Info_json = md5.md5(Login_Info_json).hexdigest() + '|' + zlib.compress(Login_Info_json, 9)

        ServerURL = self.__serviceURL
        INT_15 = 15
        for For_int in range(INT_15):
            try:
                HTTP_getRead = None

                for For_int in range(3):

                    ServerInfo = urlparse.urlparse(ServerURL)

                    if ServerInfo.scheme == 'https':
                        Http_Obj = HttpLib(ServerInfo.hostname, ServerInfo.port, timeout=60)
                    else:
                        Http_Obj = httplib.HTTPConnection(ServerInfo.hostname, ServerInfo.port, timeout=60)

                    Http_Obj.putrequest('POST', ServerInfo.path)
                    Http_Obj.putheader('Content-Length', str(len(Login_Info_json)))
                    Http_Obj.putheader('Content-Type', 'application/json')
                    Http_Obj.endheaders()

                    Http_Obj.send(Login_Info_json)

                    HTTP_getResponse = Http_Obj.getresponse()

                    HTTP_getHeaders = dict(HTTP_getResponse.getheaders())

                    if HTTP_getHeaders.has_key('location') and HTTP_getHeaders['location'] != ServerURL:

                        Http_Obj.close()
                        ServerURL = HTTP_getHeaders['location']

                    else:

                        HTTP_getRead = HTTP_getResponse.read()
                        Http_Obj.close()
                        break

                if not HTTP_getRead:
                    raise IOError('Content empty')
                    # print HTTP_getRead

                Find_return = HTTP_getRead.find('|')
                #-------------------
                Code_md5, code_json = HTTP_getRead[:Find_return], zlib.decompress(HTTP_getRead[Find_return + 1:])

                if Code_md5 != md5.md5(code_json).hexdigest():

                    raise IOError('json decode error')

                HTTP_getResponse = json.loads(code_json)

                if PC_uuid != HTTP_getResponse['uuid']:

                    raise IOError('UUID unmatched')

            except Exception as o0oo0o0O00OO:

                if For_int == INT_15 - 1:
                    Logging_Obj.exception(Get_lineNumber_fileName())
                    raise o0oo0o0O00OO
                else:
                    time.sleep(5)
            else:
                if HTTP_getResponse['error'] != None:
                    raise RpcError(HTTP_getResponse['error'])
                else:
                    return HTTP_getResponse['result']

就是带上自己的专属id,然后直接以json格式访问rpcs地址.整个过程需要正确认证才有HTTP_getResponse['result']的返回..如果认证成功以后,会从rpcs地址获取任务

Task_List = Login_Get.get_task_list(SID_INT, Scan_thread_idel)#获取任务列表

Task_List里面包含了相关的信息

{
    u'tasks': [{u'policy': u'base64.encode(plugins)', u'id': taskid, u'target': u'testphp.acunetix.com'}], 
    u'nodever': u'0', 
    u'stops': []
}

Task_List['tasks']里面包含了相关的信息

{
    u'policy': u'base64.encode(plugins)', 任务参数
    u'id': taskid, 任务id
    u'target': u'testphp.acunetix.com'任务目标
}

Tasks_n 里面包含了相关的信息

Tasks_n = {
    u'subdomain': True, 
    u'scanport': True, 
    u'maxtask': 7000, 
    u'useragent': u'Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; .NET CLR 2.0.50727)', 
    u'anyone': True, 
    u'user_dict': u'', 
    u'timeout': 24, 
    u'plugins': {}, 
    u'entry': u'http://0day5.com/', 
    u'nodes': [xxxx], 任务节点id
    u'pass_dict':
    u'', u'speed': 6
}

Tasks_n包含了填写的基本信息

Plugins_list = json.loads(base64.decodestring(Tasks_n['policy']))

获取到基本的信息以后久进入到了ProcessWork中.需要关注的就是target和Plugins_list是Tasks_n里面的到的基本信息

def ProcessWork(glock, gdebug, debugkey, uhash, rpc_server, tid, target, Plugins_list):
    global Debug_yes_no
    global sys_debug_yes_no
    global Login_Get
    global debug_key
    global Multiprocessing_RLock
    Multiprocessing_RLock = glock
    sys_debug_yes_no = gdebug
    debug_key = debugkey
    Debug_yes_no = True
    Loginx_Obj = Loginx(uhash, rpc_server)
    Oo000ooOOO = None
    if Debug_X(debugkey):
        thread.start_new_thread(Mysql_insert_QueueInfo, ())
    try:
        signal.signal(signal.SIGTERM, signal.SIG_DFL)
        signal.signal(signal.SIGINT, signal.SIG_DFL)
        Loginx_Obj.set_task_status(tid, Range_3)
        Exploit_run_Obj = Exploit_run(tid, target, Plugins_list)
        if 'entry' in Plugins_list:
            if Plugins_list['entry'].startswith('http'):#  String.startswith('xxx')判断开头是否是xxx
                Exploit_run_Obj.task_push('www', str(Plugins_list['entry']))
            else:
                Exploit_run_Obj.task_push('www', 'http://%s%s' % (target, Plugins_list['entry']))
        if not Target_isOK(target):
            Exploit_run_Obj.task_push('dns', target) # task_push(self, service, arg, uuid=None, target=None, pr=-1):
        Exploit_run_Obj.task_push('www', 'http://%s/' % target)
        Exploit_run_Obj.run()
    except (KeyboardInterrupt, SystemExit):
        pass
    except Exception as o0oo0o0O00OO:
        Logging_Obj.exception('ProcessWorker:<%d %s>' % (tid, target))
    finally:
        Loginx_Obj.set_task_status(tid, Range_5)
        if Mysql_Obj:
            Mysql_table_insert('loglog', body='exit')
    addTargetModule(target)

在Exploit_run里面的函数中能看到判断插件是否加密的部分

for plugin_x in policy['plugins']:
    Plugin = policy['plugins'][plugin_x]
    OO0OoOO0o0o = 0
    if imp.get_magic() == Plugin[:4]:
        #判定为加密的文件
        oo = marshal.loads(Plugin[8:])
        OO0OoOO0o0o = struct.unpack('<l', Plugin[4:8])[0]
    else:
        oo = Plugin
    """"""
    def _load_module(self, chunk, name='<memory>'):
        II = imp.new_module(str(name))
        exec chunk in II.__dict__
        return II
    """"""
    #imp加载文件
    II = self._load_module(oo)
    o00oo0 = None
    if OO0OoOO0o0o > 1440345600:
        #获取解密的key
        o00oo0 = Plugin[-48:-16]
        I11ii1IIiIi = Plugin[-16:]
        if I11ii1IIiIi != md5.new(Plugin[:-16]).digest()[::-1]:
            #表示不匹配
            pass
    #涉及到对II的参数补充,针对加密的插件用获取的key进行解密,
    self._patch_module(II, o00oo0)
    然后获取节点的dns,各种其他的信息.进入到task_pusk函数中.
    主要是推送的服务和地址
    def task_push(self, service, arg, uuid=None, target=None, pr=-1):
        for OO0O0 in self._modules:
            获取插件id和插件文件内容
            i1OOO0000oO = self._modules[OO0O0].assign(service, arg)
    #接下去就是判断i1OOO0000oO也就是插件运行的结果是不是一个数据
    if not isinstance(i1OOO0000oO, tuple):
        continue

所以,如果自己需要构建全部的bugscan的插件扫描功能的话.感觉还是比较简单.获取全部的插件,需要更改的就是节点中的target,还有entry,删掉或者注释掉登陆验证的请求的地方,按照它本身的结果写入或者重构数据库.其他的就静静的等待结果就好了.反正都是全部fuzz一遍.

 Mysql_table_insert('loglog', body=self.format(record))

Mysql_table_insert('rpclog', method=self.__serviceName, params=repr(args), uid=self.__uhash, uuid=PC_uuid)

Mysql_table_insert('tasklog', uuid=uuid, plugin_id=Scan_ThreadLocal.__pid, service=service, arg=repr(arg),

Mysql_table_insert('assignlog', time=int(i11i1ii1I - iI1i111I1Ii), uuid=hash, plugin_id=OO0O0,service=service, prearg=repr(arg), arg=repr(oOoO00o), isret=1, push=O0O0Oo00, i=ai + 1)

Mysql_table_insert('auditlog', uuid=md5.md5(url).hexdigest(), plugin_id=OO0O0, type=1, arg=repr(url),time=int(Day_time_2 - Day_time_1))

Mysql_table_insert('debuglog', plugin_id=Scan_ThreadLocal.__pid, body=fmt % args)

更改了一下获取的方式,因为我发现部分插件无法入库。所以入库的时候全部采用了base64编码的形式入库

                    if iIIi:
                        o0O0O00('[***] fetch %d new plugins', len(iIIi))
                        i1iiIiI1Ii1i = i1.get_plugin_list(iIIi)

                        for hash in i1iiIiI1Ii1i:
                            i1iIi = i1iiIiI1Ii1i[hash]
                            oO00oo0o00o0o[hash] = (i1iIi[0], zlib.decompress(binascii.a2b_hex(i1iIi[1])))
                            #get
                            try:
                                dbcon = Mysqlclass()
                                sql = "insert `plugin` (`name`,`upid`,`content`) VALUES (%s, %s, %s)"
                                args = str(oO00oo0o00o0o[hash][0]),str(hash),base64.b64encode(str(oO00oo0o00o0o[hash][1]))
                                print str(oO00oo0o00o0o[hash][0])
                                dbcon.exec_sql(sql,args)
                            except Exception, e:
                                print str(oO00oo0o00o0o[hash][0]),str(e)
                                pass

然后按照它本身的步骤,从数据库里面获取插件出来调用.最后的形式就是.

def get_plugins():
    plugins = {}
    sql = "select name,content from `plugin`"
    results = dbconn.select_result(sql)
    for result in results:
        pocid = int(result['name'])
        plugins[pocid] =base64.decodestring(result['content'])
    return plugins

def main(target):
    if target == '':
        target = domain
    plugins = get_plugins()
    glock = None
    gdebug = "0"
    debugkey = ''
    policy = {
        u'subdomain': True, 
        u'scanport': True, 
        u'maxtask': 7000, 
        u'useragent': u'Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; .NET CLR 2.0.50727)', 
        u'anyone': True, 
        u'timeout': 24, 
        u'plugins': plugins,#{2051: '',1070: '', 1072: '', 1073: '', 1074: ''}
        u'target': target, #可以为域名和ip
        u'user_dict': u'', 
        u'pass_dict': u'', 
        u'speed': 6
    }
    tid = 1234
    service = 'www'
    ProcessWork(glock, gdebug, debugkey, tid, service, policy)

if __name__ == '__main__':
    import sys
    if len(sys.argv)==2:
        domain = sys.argv[1]
        main(domain)
    else:
        print ("%s http://fuck.0day5.com" % (sys.argv[0]))

标签:none

添加新评论 »

分类
最新文章
最近回复
  • 轨迹: niubility!
  • 没穿底裤: 好办法..
  • emma: 任务计划那有点小问题,调用后Activation.exe不是当前活动窗口,造成回车下一步下一步...
  • 没穿底裤: hook execve函数
  • tuhao lam: 大佬,还有持续跟进Linux命令执行记录这块吗?通过内核拦截exec系统调用的方式,目前有没有...