windows进程监控之恶意识别

发布时间:January 11, 2019 // 分类:工作日志,开发笔记,linux,python,windows // No Comments

前不久更新了针对windows下命令记录的种种,并利用beat对系统进行监控。达到了利用winlogbeat来发送sysmon记录的日志到logstash.采用elk进行统一展示的过程。这些主要的目的就是收集所有的日志,但是还缺少对日志的进一步分析。因此对这个想法做了一个简单的demo

PS:现场部署以后会发现大量的数据写入es的时候会超时,导致数据写入不完整。想着在中间放一个中间件来缓冲一下写入的频率.这里选择来kafka来实现

简单的数据拉取

import time
from elasticsearch.exceptions import TransportError
from elasticsearch import Elasticsearch
from units import match_rule

es = Elasticsearch()

"""
ssh -CfNg -L 9200:127.0.0.1:9200 root@192.168.0.xxx
ssh -CfNg -L 5601:127.0.0.1:5601 root@192.168.0.xxx
"""
index = 'windowsevt*'

query = '''{
       "query":{
            "bool":{
                "must":[
                    {
                        "match":{
                            "source_name":"Sysmon"
                        }
                    },
                    {
                        "term":{
                            "event_id":1
                        }
                    }
                ]
            }
        },
      "sort":{
            "@timestamp":{ 
                "order":"asc"
            }
        },
    "from": 0,
    "size": 20
}'''
try:
    resp = es.search(index, body=query)
    total = resp['hits']['total']
    print total
    for item in resp['hits']['hits']:
        print match_rule(item['_source']['event_data'])
except TransportError as e:
    print(e.info)

当把数据从es拉出来以后需要从三个维度进行分析(hash,命令和进程产生网络)

利用hash进行匹配,默认记录的是sha1,但是加载了配置文件后改成了md5/sha256.主要依靠的是第三方的一些平台来进行匹配。分别采了ti.360.net和s.threatbook.cn两个。但是发现360对相关的搜索有限制(但是提供了api搜索,不过没有找到),微步还好一些(提供api搜索和爬虫两种)。最后采用了微步的搜索结果。

class Threatbook(object):

    def __init__(self,hashes,method):
        self._hash = hashes
        self._method = method
        self._request = requests.session()
        self._headers = {
            'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.110 Safari/537.36',
            'Accept': 'application/json, text/javascript, */*;q=0.8',
            'Accept-Language': 'zh-CN,zh;q=0.9,es;q=0.8,fr;q=0.7,vi;q=0.6',
            'Cookie': 'session=cookies',
            'Origin': 'https://s.threatbook.cn',
            'Content-Type': 'application/json',
            'Cookie': 'cookies'
        }
        self.webu = {
            'malicious':'检测为恶意文件', 
            'suspicious':'提示为可疑文件', 
            'clean':'文件暂未发现可疑'
        }


    def get_result_api(self,sandbox_type,sha256):
        url = 'https://s.threatbook.cn/api/v2/file/report'
        data = ("apikey=apikey&sandbox_type={sandbox_type}&sha256={sha256}").format(sandbox_type=sandbox_type,sha256=sha256)
        #print data
        response = requests.get(url+"?"+data,timeout=30,verify=False)
        content = json.loads(response.content)
        return json.dumps(content['data']['summary'])

    def get_result(self):
        item = {}
        url0 = "https://s.threatbook.cn/api/v3/webpage/search"
        url1 = "https://s.threatbook.cn/api/v3/webpage/summary/{sha256}"
        url2 = "https://s.threatbook.cn/api/v3/webpage/static/{sha256}"
        url3 = "https://s.threatbook.cn/api/v3/webpage/sandbox/{sha256}"
        data = '''{"method":"md5hases"}'''
        try:
            resp0 = self._request.post(url0,data=data.replace('md5hases',self._hash).replace('method',self._method),headers=self._headers,timeout=30,verify=False)
            if resp0.status_code !=200:
                return
            content0 = json.loads(resp0.content)
            info = ""
            try:
                if "multi_engines" in str(content0['data']):
                    item["multi_engines"] = content0['data'][0]["multi_engines"]
            except:
                pass
            try:
                if "judgment" in str(content0['data']):
                    item["judgment"] = content0['data'][0]["judgment"]
            except:
                pass

            for xfile in content0['data']:
                info = xfile['sha256'] + "-"+ xfile['sandbox_type']

            if len(info)==0:
                return item

            url1 = url1.format(sha256=info)
            url2 = url2.format(sha256=str(info.split('-')[0]))
            url3 = url3.format(sha256=str(info))

            resp1 = self._request.get(url1,headers=self._headers,timeout=30,verify=False)
            content1 = json.loads(resp1.content)
            tables = content1['data']
            tagx = []
            for table in tables:
                if table == "threat_level":
                    item[table] = self.webu.get(tables[table])
                elif table == "tag":
                    for name in tables[table]:
                        tagx.extend(tables[table][name])
                    item[table] = list(set(tagx))
                else:
                    item[table] = tables[table]
            resp2 = self._request.get(url2,headers=self._headers,timeout=30,verify=False)
            content2 = json.loads(resp2.content)
            basics = content2['data']['basic']
            for basic in basics:
                item[basic] = basics[basic]

            resp3 = self._request.get(url3,headers=self._headers,timeout=30*2,verify=False)
            content3 = json.loads(resp3.content)
            iocs = content3['data']['ioc']
            if len(iocs)>0:
                item["ioc"] = iocs
            mtasg = []
            for iocx in iocs:
                mtasg.extend(iocx['tag_list'])
            if len(list(set(mtasg)))>0:
                item["ioctag"] = list(set(mtasg))
            return json.dumps(item)

        except Exception as why:
            print "error",why
            traceback.print_exc()

这里还可以对文件hash做一个去重查询的处理,一旦确定某些文件的hash是正常的就没有必要调用查询接口,还可以节省资源。

匹配参数进程信息。主要是针对一些探测类的命令。这种命令不是恶意进程,是系统自带的一些。因此对其增加规则来达到检测恶意命令的匹配。

(这里采取regex和match两种方式,因此在match的时候误报率有点高)一旦匹配上就输出可以信息

def match_rule(event_data):
    hashes = event_data['Hashes']
    if "," in hashes:
        Hashe = hashes.split(',')[0]
    else:
        Hashe = hashes
    method,nhashes = str(Hashe.split('=')[0]).lower(),str(Hashe.split('=')[1]).lower()
    ti = Threatbook(nhashes,method)
    print ti.get_result()
    for name in event_data:
        if name in ["ParentCommandLine","CommandLine","Image","ParentImage"]:
            result = check_rule(event_data[name])
            if len(result)>0:
                print name,event_data[name],json.dumps(result)

网络地址匹配主要是根据event_id为3的网络进程来实现的。这里弄了好久,只能用一个受限制的地址来实现

class Venuseye(object):
    def __init__(self,ip):
        self._ip = ip
        self._request = requests.session()
        self._headers = {
            'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.110 Safari/537.36',
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
            'Accept-Language': 'zh-CN,zh;q=0.9,es;q=0.8,fr;q=0.7,vi;q=0.6',
            'X-Requested-With': 'XMLHttpRequest',
            'Referer': 'https://www.venuseye.com.cn/ip/',
            'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8'
        }

    def get_ip_res(self):
        item = {}
        url = "https://www.venuseye.com.cn/ve/ip"
        data = "target="+str(self._ip)
        try:
            resp = requests.post(url,data=data,headers=self._headers,timeout=30,verify=False)
            if resp.status_code !=200:
                return item
            content = json.loads(resp.content)
            print json.dumps(content['data'])
        except Exception as why:
            print "error",why
            traceback.print_exc()

实际测试并发量高了以后,直接就被拦截了。真特么难受,所有还需要一个代理池来维持访问结果。

appKeys = [apikeys
]

def general(appKey):
    flag = False
    ip_port = 'someprooxys'
    proxy = {"http": "http://" + ip_port, "https": "https://" + ip_port}
    headers = {"Authorization": 'Basic '+ appKey}
    try:
        r = requests.get("http://pv.sohu.com/cityjson?ie=utf-8", headers=headers, proxies=proxy,verify=False,allow_redirects=False,timeout=30)
        if r.status_code == 200:
            ip_str = re.findall(r'\{[\s\S]+\}',r.content.decode('utf-8'))
            if len(ip_str)>0:
                flag = appKey
                return flag
    except Exception as e:
        traceback.print_exc()
        print(str(e),appKey)
        return flag

class Ti_360(object):

    def __init__(self,hashes):
        self._hash = hashes
        self._request = requests.session()
        self._headers = {
            'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.110 Safari/537.36',
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
            'Accept-Language': 'zh-CN,zh;q=0.9,es;q=0.8,fr;q=0.7,vi;q=0.6',
            'X-Requested-With': 'XMLHttpRequest'
        }
        self.proxy = {"http": "http://someprooxys", "https": "https://someprooxys"}


    def get_proxy(self):
        import random
        appkey = False
        appKey = random.choice(appKeys)
        appkey = general(appKey)
        if not appkey:
            appkey = general(appKey)
        return appkey

    def get_filehash(self):
        appKey = self.get_proxy()
        if appKey:
            self._headers["Authorization"] = 'Basic '+ appKey
        url = "https://ti.360.net/search?type=file&value={hash}".format(hash=self._hash)
        if appKey:
            resp = self._request.get(url,headers=self._headers,proxies=self.proxy,timeout=30,verify=False)
        else:
            resp = self._request.get(url,headers=self._headers,timeout=30,verify=False)
        try:
            cookies = resp.headers['Set-Cookie']
            cookie = re.findall('session=(.*?);',cookies)
            if len(cookie)==0:
                return
            self._headers['Referer'] = url
            self._headers['Cookie'] = "session="+str(cookie[0])
        except Exception as e:
            pass

        del self._headers['Accept']
        self._headers['Accept'] = 'application/json, text/javascript, */*; q=0.01'

        url0 = "https://ti.360.net/ti/query?limit=100&offset=0&page=1&type=file&value={hash}".format(hash=self._hash)
        url1 = "https://ti.360.net/ti/task/{tags}"
        try:
            if appKey:
                resp0 = self._request.get(url0,headers=self._headers,proxies=self.proxy,timeout=30,verify=False)
            else:
                resp0 = self._request.get(url0,headers=self._headers,timeout=30,verify=False)
            if resp0.status_code !=200:
                return
            content0 = json.loads(resp0.content)
            if int(content0['status'])!=200:
                print "your IP in black list"
                return
            info = ""
            for name in content0['data']:
                info = info + content0['data'][name]+","

            if len(info)==0:
                return
            url1 = url1.format(tags=str(info))
            if appKey:
                resp1 = self._request.get(url1,headers=self._headers,proxies=self.proxy,timeout=30,verify=False)
            else:
                resp1 = self._request.get(url1,headers=self._headers,timeout=30,verify=False)
            if resp1.status_code !=200:
                return
            content1 = json.loads(resp1.content)

            for xfile in content1['data']:
                if "table" in str(content1['data'][xfile]):
                    if len(content1['data'][xfile]['table'])>0:
                        tables = content1['data'][xfile]['table'][0]
                        return json.dumps(tables)

        except Exception as why:
            print "error",why
            traceback.print_exc()

    def get_ipioc(self):
        item = {}
        appKey = self.get_proxy()
        if appKey:
            self._headers["Authorization"] = 'Basic '+ appKey
        url = "https://ti.360.net/search?type=ip&value={hash}".format(hash=self._hash)
        if appKey:
            resp = self._request.get(url,headers=self._headers,proxies=self.proxy,timeout=30,verify=False)
        else:
            resp = self._request.get(url,headers=self._headers,timeout=30,verify=False)
        try:
            cookies = resp.headers['Set-Cookie']
            cookie = re.findall('session=(.*?);',cookies)
            if len(cookie)==0:
                return
            self._headers['Referer'] = url
            self._headers['Cookie'] = "session="+str(cookie[0])
        except Exception as e:
            pass

        del self._headers['Accept']
        self._headers['Accept'] = 'application/json, text/javascript, */*; q=0.01'

        url0 = "https://ti.360.net/ti/query?limit=100&offset=0&page=1&type=ip&value={hash}".format(hash=self._hash)
        url1 = "https://ti.360.net/ti/task/{tags}"
        try:
            if appKey:
                resp0 = self._request.get(url0,headers=self._headers,proxies=self.proxy,timeout=30,verify=False)
            else:
                resp0 = self._request.get(url0,headers=self._headers,timeout=30,verify=False)
            if resp0.status_code !=200:
                return
            content0 = json.loads(resp0.content)
            if int(content0['status'])!=200:
                print "your IP in black list"
                return
            info = ""
            for name in content0['data']:
                info = info + content0['data'][name]+","

            if len(info)==0:
                return

            url1 = url1.format(tags=str(info))
            if appKey:
                resp1 = self._request.get(url1,headers=self._headers,proxies=self.proxy,timeout=30,verify=False)
            else:
                resp1 = self._request.get(url1,headers=self._headers,timeout=30,verify=False)
            if resp1.status_code !=200:
                return
            content1 = json.loads(resp1.content)

            for xfile in content1['data']:
                if "ip_attribute" in xfile:
                    if "table" in str(content1['data'][xfile]):
                        if len(content1['data'][xfile]['table'])>0:
                            for name in content1['data'][xfile]['table']:
                                item[name] = content1['data'][xfile]['table'][name]
                elif "ip_ioc_detect" in xfile:
                    ioctag = []
                    if "table" in str(content1['data'][xfile]):
                        if len(content1['data'][xfile]['table'])>0:
                            for name in content1['data'][xfile]['table']:
                                ioctag.extend(name['tags'])
                    if len(list(set(ioctag)))>0:
                        item['ioctag'] = list(set(ioctag))
                elif "ip_tag" in xfile:
                    iptag = set()
                    if "table" in str(content1['data'][xfile]):
                        if len(content1['data'][xfile]['table'])>0:
                            for name in content1['data'][xfile]['table']:
                                try:
                                    iptag.add(str(name['platform'])+" "+str(name['type'])+" "+ str(name['tag']))
                                except:
                                    try:
                                        iptag.add(str(name['type'])+" "+ str(name['tag']))
                                    except:
                                        iptag.add(str(name['tag']))

                    if len(list(iptag))>0:
                        item['iptag'] = list(iptag)
                elif "ip_try_connect" in xfile:
                    ipconnect = set()
                    if "table" in str(content1['data'][xfile]):
                        for name in content1['data'][xfile]['table']:
                            try:
                                ipconnect.add(str(name['malicious_family']+" " + str(name['malicious_type'])))
                            except:
                                try:
                                    ipconnect.add(str(name['malicious_family']))
                                except Exception as e:
                                    ipconnect.add(str(name['malicious_type']))
                    if len(list(ipconnect))>0:
                        item['iptry'] = list(ipconnect)
            if len(item)>0:
                item['ipaddr'] = self._hash
            return json.dumps(item)
        except Exception as why:
            print "error",why
            traceback.print_exc()

测试效果如下

[{"is_proxy": false, "whitelist": "1", "ipaddr": "52.4.209.250", "user_type": "\u5883\u5916IDC", "ioctag": ["SUPPOBOX", "WWW.DSHIELD.ORG", "BOTNET", "REV 3807", "C&C", "MALICIOUS", "TROJAN-ACTIVITY", "HTTPS://ZEUSTRACKER.ABUSE.CH/BLOCKLIST.PHP?DOWNLOAD=IPBLOCKLIST", "DDOS", "DDOS TARGET", "HTTPS://ZEUSTRACKER.ABUSE.CH/BLOCKLIST.PHP?DOWNLOAD=BADIPS"], "proxy_type": "", "location": "\u7f8e\u56fd/\u5f17\u5409\u5c3c\u4e9a\u5dde/\u963f\u4ec0\u672c", "iptag": ["RTF", "Windows \u8fdc\u63a7\u6728\u9a6c SUPPOBOX", "RANSOMWARE", "Windows \u8fdc\u63a7\u6728\u9a6c RANBYUS", "Windows \u8fdc\u63a7\u6728\u9a6c TINBA", "Windows \u8fdc\u63a7\u6728\u9a6c RAMDO", "LOCKY", "Windows \u8fdc\u63a7\u6728\u9a6c RAMNIT"], "asn": "AS14618 Amazon.com, Inc.", "is_idc": true},
{"is_proxy": false, "whitelist": "1", "ipaddr": "185.198.59.121", "user_type": "\u5883\u5916IDC", "ioctag": ["C2", "DDOS", "MALICIOUS"], "proxy_type": "", "location": "\u7f57\u9a6c\u5c3c\u4e9a/\u5e03\u52a0\u52d2\u65af\u7279", "iptag": ["Windows \u7a83\u5bc6\u6728\u9a6c FORMBOOK", "CVE-2017-11882"], "asn": "AS60117 Host Sailor Ltd.", "is_idc": true},
{"is_proxy": false, "whitelist": "1", "ipaddr": "23.236.76.75", "user_type": "\u5883\u5916IDC", "asn": "AS134835 Starry Network Limited", "location": "\u7f8e\u56fd/\u52a0\u5229\u798f\u5c3c\u4e9a\u5dde/\u6d1b\u6749\u77f6", "ioctag": ["DDOS", "MALICIOUS"], "proxy_type": "", "is_idc": true},
{"is_proxy": false, "whitelist": "1", "ipaddr": "91.229.79.184", "user_type": "\u5883\u5916IDC", "asn": "AS42331 PE Freehost", "location": "\u4e4c\u514b\u5170/\u57fa\u8f85", "iptag": ["PATCHWORK", "DROPPING ELEPHANT", "APT"], "proxy_type": "", "is_idc": true}]


这里还需要增加一个机制,一旦匹配失败需要标记重新匹配。不然一旦访问出错就没有办法匹配到了

demo的结果如下.

后续想着对全部数据进行入库展示,利用mongodb或者mysql来作为后端数据库支持。前端利用flask或者tornado 来展示。

PS:后面改进了,利用loki的IOC来识别了

分类
最新文章
最近回复
  • 没穿底裤: 最近发现的新版本可以装在LINUX了。但是API有点变化
  • 没穿底裤: 暂时好像没有看到这个功能.
  • 没穿底裤: 这个只是一个分析,并不是使用方法哟
  • 没穿底裤: 抱歉,很久没有打理了。会不会你使用的是12版本。目前还没有遇到过这种情况
  • bao song: http://0cx.cc/php_decode_shell.jspx 这个怎么用,代码提示...