利用beat对系统进行监控

发布时间:December 10, 2018 // 分类:运维工作,开发笔记,linux,windows,生活琐事 // No Comments

主要是利用winlogbeat和auditbeat进行监控

关于安装elk.自行更新到最新版本

wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.4.2.rpm
wget https://artifacts.elastic.co/downloads/kibana/kibana-6.4.2-x86_64.rpm
wget https://artifacts.elastic.co/downloads/logstash/logstash-6.4.2.rpm

rpm -ivh elasticsearch-6.4.2.rpm 
sudo chkconfig --add elasticsearch
/etc/init.d/elasticsearch start

rpm -ivh kibana-6.4.2-x86_64.rpm 
/etc/init.d/kibana start
sudo chkconfig --add kibana

rpm -ivh logstash-6.4.2.rpm
cd /usr/share/logstash
ln -s /etc/logstash ./config

1.windows系统

暂时针对的是win7及其以上的系统才方便使用,主要的是方便升级powershell。有点奇葩的是需要系统是正版,如果不是请自行激活。相关的记录参照针对windows下命令记录的种种

这里的监控主要是开启了各种命令执行记录放进事件文件中,然后利用winlogbeat对相关文件进行监控。然后安装Winlogbeat服务

PS C:\Users\Administrator> cd 'C:\Program Files\Winlogbeat'
PS C:\Program Files\Winlogbeat> .\install-service-winlogbeat.ps1

Security warning
Run only scripts that you trust. While scripts from the internet can be useful,
this script can potentially harm your computer. If you trust this script, use
the Unblock-File cmdlet to allow the script to run without this warning message.
Do you want to run C:\Program Files\Winlogbeat\install-service-winlogbeat.ps1?
[D] Do not run  [R] Run once  [S] Suspend  [?] Help (default is "D"): R

Status   Name               DisplayName
------   ----               -----------
Stopped  winlogbeat         winlogbeat

相关的配置文件winlogbeat.yml

###################### Winlogbeat Configuration Example ##########################

# This file is an example configuration file highlighting only the most common
# options. The winlogbeat.full.yml file from the same directory contains all the
# supported options with more comments. You can use it as a reference.
#
# You can find the full configuration reference here:
# https://www.elastic.co/guide/en/beats/winlogbeat/index.html

#======================= Winlogbeat specific options ==========================

# event_logs specifies a list of event logs to monitor as well as any
# accompanying options. The YAML data type of event_logs is a list of
# dictionaries.
#
# The supported keys are name (required), tags, fields, fields_under_root,
# forwarded, ignore_older, level, event_id, provider, and include_xml. Please
# visit the documentation for the complete details of each option.
# https://go.es.io/WinlogbeatConfig

winlogbeat.event_logs:
  - name: Application
    ignore_older: 72h
    fields:
      log_type: windowsevt
  - name: Security
    fields:
      log_type: windowsevt
  - name: System
    fields:
      log_type: windowsevt
  - name: Windows PowerShell
    fields:
      log_type: windowsevt
  - name: Microsoft-Windows-PowerShell/Operational
    fields:
      log_type: windowsevt
  - name: Microsoft-Windows-WMI-Activity/Operational
    fields:
      log_type: windowsevt
  - name: Microsoft-Windows-RemoteDesktopServices-RdpCoreTS/Operational
    fields:
      log_type: windowsevt
  - name: Microsoft-Windows-Sysmon/Operational
    fields:
      log_type: windowsevt 
#================================ General =====================================

# The name of the shipper that publishes the network data. It can be used to group
# all the transactions sent by a single shipper in the web interface.
#name:

# The tags of the shipper are included in their own field with each
# transaction published.
#tags: ["service-X", "web-tier"]

# Optional fields that you can specify to add additional information to the
# output.
#fields:
#  env: staging

#================================ Outputs =====================================

# Configure what outputs to use when sending the data collected by the beat.
# Multiple outputs may be used.

#-------------------------- Elasticsearch output ------------------------------
# output.elasticsearch:
#   # Array of hosts to connect to.

  # Optional protocol and basic auth credentials.
  # protocol: "http"
#----------------------------- Logstash output --------------------------------
output.logstash:
  #The Logstash hosts
  hosts: ["localhost:5044"]

  # Optional SSL. By default is off.
  # List of root certificates for HTTPS server verifications
  #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

  # Certificate for SSL client authentication
  #ssl.certificate: "/etc/pki/client/cert.pem"

  # Client Certificate Key
  #ssl.key: "/etc/pki/client/cert.key"

#================================ Logging =====================================
# Sets log level. The default log level is info.
# Available log levels are: critical, error, warning, info, debug
#logging.level: debug

# At debug level, you can selectively enable logging only for some components.
# To enable all selectors use ["*"]. Examples of other selectors are "beat",
# "publish", "service".
#logging.selectors: ["*"]
logging.to_files: true
logging.files: 
  path: C:/ProgramData/winlogbeat/Logs
logging.level: info

再启动winlogbeat服务

net start winlogbeat

需要修改output.logstash:中的host为相关安装elk的机器。同时该机器的logstash的配置如下

input {
  beats {
    port => 5044
    host => "0.0.0.0"
  }
}

filter {

if ([fields][log_type] == "windowsevt") {

mutate {
 add_field => { "[orig_message]" => "%{message}" }
 }

#substitute some fields
 mutate {
 gsub => [
 "message", "\r\n", " ",
 "message", "\n", " " 
 ] 
 }

#Filter the message field of events 403 and 400
 if ([event_id] == 403 or [event_id] == 400) {
 grok {
 match => { "message" => "%{GREEDYDATA:[event_data][msg]}\sDetails:\s*NewEngineState\s*=\s*%{GREEDYDATA:[event_data][details][newenginewtate]}\s*PreviousEngineState\s*=\s*%{GREEDYDATA:[event_data][details][previousengineState]}\s*SequenceNumber\s*=\s*%{INT:[event_data][details][sequencenumber]}\s*HostName\s*=\s*%{GREEDYDATA:[event_data][details][hostname]}\s*Host\s*Version\s*=\s*%{GREEDYDATA:[event_data][details][hostversion]}\s*Host\s*Id\s*=\s*%{GREEDYDATA:[event_data][details][hostid]}\s*Host\s*Application\s*=\s*%{GREEDYDATA:[event_data][details]hostapplication]}\s*Engine\s*Version\s*=\s*%{GREEDYDATA:[event_data][details][engineversion]}\s*Runspace\s*Id\s*=\s*%{GREEDYDATA:[event_data][details][runspaceid]}\s*Pipeline\s*Id\s*=\s*%{GREEDYDATA:[event_data][details][pipelineid]}\s*Command\s*Name\s*=\s*%{GREEDYDATA:[event_data][details][commandname]}\s*Command\s*Type\s*=\s*%{GREEDYDATA:[event_data][details][commandtype]}\s*Script\s*Name\s*=\s*%{GREEDYDATA:[event_data][details][scriptmname]}\s*Command\s*Path\s*=\s*%{GREEDYDATA:[event_data][details][commandpath]}\s*Command\s*Line\s*=\s*%{GREEDYDATA:[event_data][details][commandline]}" }
 } 
 }
 #Filter the message field of event 600
 if ([event_id] == 600) {
 grok {
 match => { "message" => "%{GREEDYDATA:[event_data][msg]}\sDetails:\s*ProviderName\s*=\s*%{GREEDYDATA:[event_data][details][providername]}\s*NewProviderState\s*=\s*%{GREEDYDATA:[event_data][details][newproviderstate]}\s*SequenceNumber\s*=\s*%{INT:[event_data][details][sequencenumber]}\s*HostName\s*=\s*%{GREEDYDATA:[event_data][details][hostname]}\s*Host\s*Version\s*=\s*%{GREEDYDATA:[event_data][details][hostversion]}\s*Host\s*Id\s*=\s*%{GREEDYDATA:[event_data][details][hostid]}\s*Host\s*Application\s*=\s*%{GREEDYDATA:[event_data][details]hostapplication]}\s*Engine\s*Version\s*=\s*%{GREEDYDATA:[event_data][details][engineversion]}\s*Runspace\s*Id\s*=\s*%{GREEDYDATA:[event_data][details][runspaceid]}\s*Pipeline\s*Id\s*=\s*%{GREEDYDATA:[event_data][details][pipelineid]}\s*Command\s*Name\s*=\s*%{GREEDYDATA:[event_data][details][commandname]}\s*Command\s*Type\s*=\s*%{GREEDYDATA:[event_data][details][commandtype]}\s*Script\s*Name\s*=\s*%{GREEDYDATA:[event_data][details][scriptmname]}\s*Command\s*Path\s*=\s*%{GREEDYDATA:[event_data][details][commandpath]}\s*Command\s*Line\s*=\s*%{GREEDYDATA:[event_data][details][commandline]}" }
 } 
 }

#standartize the IP address field
 if ([event_data][IPString]) {
 mutate {
 rename => { "[event_data][IPString]" => "[remote_ip]" }
 }
 }

#standartize the IP address field
 if ([event_data][ClientAddress]) {
 mutate {
 rename => { "[event_data][ClientAddress]" => "[remote_ip]" }
 }
 }
 #standartize the IP address field
 if ([event_data][IpAddress]) {
 grok {
 match => { "[event_data][IpAddress]" => "%{IPV4:[remote_ip]}" }
 }

}

#Split Remote IP and Port 
 if ([event_data][ClientIP]) {
 mutate {
 split => ["[event_data][ClientIP]" , ":"]
 add_field => { "[remote_ip]" => "%{[event_data][ClientIP][0]}" }
 add_field => { "[port]" => "%{[event_data][ClientIP][1]}" }
 remove_field => [ "[event_data][ClientIP]" ]
 }
 }

#add GeoIP
 geoip {
 source => "[remote_ip]]"
 target => "[geoip]"
 }
}
}

output {
  elasticsearch {
    hosts => ["127.0.0.1:9200"]
    index => "windowsevt-%{+YYYY.MM.dd}"
    manage_template => false
  }
  stdout { codec => rubydebug }
}

当然也可以精简为

input {
  beats {
    port => 5044
    host => "0.0.0.0"
  }
}


output {
  elasticsearch {
    hosts => ["127.0.0.1:9200"]
    index => "auditbeat-%{+YYYY.MM.dd}"
    manage_template => false
  }
  stdout { codec => rubydebug }
}

启动logstash

./bin/logstash -f config/conf.d/winevtx.conf

同时该机器需要打开防火墙开放5044端口给相关的机器。到kibana新建索引

测试执行wmi


非常规应用

缺点也是非常明显,一旦停止了sysmon和winlogbeat,就无法继续采集到信息了。

2.linux系统

开始准备使用Audit来实现,后来发现elastic发现一个神奇的玩意Auditbeat。发现网上大多数都是直接写入es或者kibana。直接写入logstash的好像很少。记录一下

https://artifacts.elastic.co/downloads/beats/auditbeat/auditbeat-6.5.2-x86_64.rpm
rpm -ivh auditbeat-6.5.2-x86_64.rpm
sudo chkconfig --add auditbeat
#修改/etc/auditbeat/auditbeat.yml
mv /etc/auditbeat/auditbeat.yml /etc/auditbeat/auditbeat.yml_bak
wget http://0cx.cc/ps/auditbeat.yml -O /etc/auditbeat/auditbeat.yml

值得需要修改的地方

- module: file_integrity
  paths:

如果需要新增监控的目录就继续写 - path。这里的path需要的是绝对路径,另外一个地方是output.logstash。修改为自己elk的机器。集合之前写的bash执行命令监控,可以持续监控。保存后重启auditbeat

service auditbeat restart

logstash建立监控配置

vim auditbeat.conf
input {
  beats {
    port => 5045
    host => "0.0.0.0"
  }
}


output {
  elasticsearch {
    hosts => ["127.0.0.1:9200"]
    index => "auditbeat-%{+YYYY.MM.dd}"
    manage_template => false
  }
  stdout { codec => rubydebug }
}

同样的在主机上修改防火墙打开对应的5045端口。然后在kibana新建索引即可。但是这里有个坑爹的地方,需要在相关的机器上修改对应的机器名.linux默认机器名都是localhost.localdomain

缺点也是非常明显,一旦停止了auditbeat服务以后就gg了

参考

https://github.com/margusmaki/ELK
https://raw.githubusercontent.com/Mosuan/AuditdPy/master/docs/rule.txt

利用RELK进行日志收集

发布时间:April 3, 2018 // 分类:运维工作,开发笔记,python // No Comments

前不久在做应急的总是遇到要求对日志进行分析溯源,当时就想到如果对常见的日志类进行解析后统一入库处理,然后在对相关的IP/URL进行统计归纳。对于溯源之类的很是方便。想到数据量比较大,又要便于分析,就想到了ELK.

搭建一套基于elk的日志分析系统。
系统centos 内存4G 双核

大概架构如此

1.elk搭建

wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.4.2.rpm
wget https://artifacts.elastic.co/downloads/kibana/kibana-6.4.2-x86_64.rpm
wget https://artifacts.elastic.co/downloads/logstash/logstash-6.4.2.rpm

rpm -ivh elasticsearch-6.4.2.rpm 
sudo chkconfig --add elasticsearch
/etc/init.d/elasticsearch start

rpm -ivh kibana-6.4.2-x86_64.rpm 
/etc/init.d/kibana start
sudo chkconfig --add kibana

rpm -ivh logstash-6.4.2.rpm
cd /usr/share/logstash
ln -s /etc/logstash ./config

整个elk系统搭建好了,安装redis作为agent收集日志来作为logstash的输入源

wget http://download.redis.io/redis-stable.tar.gz
tar zxf redis-stable.tar.gz 
cd redis-stable
make && make install

修改redis.conf。

bind 0.0.0.0
protected-mode no
daemonize yes
maxclients 1000000

启动redis

sudo redis.conf /etc/
redis-server /etc/redis.conf

Logstash配置文件是JSON格式,放在/etc/logstash/conf.d 。 该配置由三个部分组成:输入,过滤器和输出。

input 数据输入端,可以接收来自任何地方的源数据。
file:从文件中读取
syslog:监听在514端口的系统日志信息,并解析成RFC3164格式。
redis:从redis-server list 中获取
beat:接收来自Filebeat的事件
Filter 数据中转层,主要进行格式处理,数据类型转换、数据过滤、字段添加,修改等,常用的过滤器如下。
grok: 通过正则解析和结构化任何文本。Grok 目前是logstash最好的方式对非结构化日志数据解析成结构化和可查询化。logstash内置了120个匹配模式,满足大部分需求。
mutate: 在事件字段执行一般的转换。可以重命名、删除、替换和修改事件字段。
drop: 完全丢弃事件,如debug事件。
clone: 复制事件,可能添加或者删除字段。
geoip: 添加有关IP地址地理位置信息。
output 是logstash工作的最后一个阶段,负责将数据输出到指定位置,兼容大多数应用,常用的有:
elasticsearch: 发送事件数据到 Elasticsearch,便于查询,分析,绘图。
file: 将事件数据写入到磁盘文件上。
mongodb:将事件数据发送至高性能NoSQL mongodb,便于永久存储,查询,分析,大数据分片。
redis:将数据发送至redis-server,常用于中间层暂时缓存。
graphite: 发送事件数据到graphite。http://graphite.wikidot.com/
statsd: 发送事件数据到 statsd。

编写logstash的配置文件。对所有的数据全盘接受,感谢Mosuan师傅的指导。

input {    
    redis {
        host => '127.0.0.1'
    port => 6379
        password => 'password'
        data_type => 'list'
        key => 'logstash:redis'
    }
}
output {
    elasticsearch { hosts => localhost }
    stdout { codec => rubydebug }
}

Logpara

一个对常见的web日志进行解析处理的粗糙DEMO。

Python 2.7 License

目标

  • 对被请求的URL进行解析,解析出是否常见的攻击方式
  • 对来访的IP进行深度解析,包含经纬度,物理地址
  • 对来访的UA进行深度解析,解析出设备,浏览器种类,是否爬虫
  • 把全部的日志解析了入库,做RELK处理

TO DO

  • 对入库elasticsearch的日志进行处理并展示

Useage

  • 使用之前先修改common/units.py

    redis_host = '192.168.87.222'
    redis_port = 6379
    redis_pass = 'cft67ygv'
    redis_db = 0
    redis_key = 'logstash:redis'
    
  • 使用
Usage: main.py --type IIS|Apache|Tomcat|Nginx --file file|directory

log parser

Options:
  -h, --help   show this help message and exit
  --type=TYPE  chose which log type
  --file=FILE  chose file or directory

脚本地址:https://github.com/0xa-saline/Logpara

导入后的结果类似


单个查看

如果基于nginx还可以收集post数据,在溯源取证以及日志分析都是有很好的帮助。