记录日常工作关于系统运维,虚拟化云计算,数据库,网络安全等各方面问题。

filebeat7.10+logstash7.10+Elastic7.10+kibana7.10收集centos系统ssh日志IP并将IP地图化展示


1,配置信息如下:

IP 系统 JDK Eastic Filebeat logstash Kibana
192.168.137.30 Centos7 openjdk-11 elastic7.10 ------ logstash7.10 Kibana7.10
192.168.137.26 Centos7 openjdk-11 elastic7.10 Filebeat7.10 --------- -----------


2,具体安装情况就不详细说了。

elastic7组建集群,使用源码包安装,JDK使用yum安装。

filebeat,logstash,kibana都是使用RPM包安装,比较简单,可以到华为云镜像网站下载安装包。

filebeat收集日志----- logstash过滤重构------ES保存数据----Kibana展示地图


3,以下是filebeat的配置文件信息。


[root@centos6mini filebeat]# cat filebeat.yml

filebeat.inputs:

 - type: log

   enabled: true

   paths:

     - /var/log/secure

#   include_lines: ['Bad']

   tags:

     ["secure-log"]

   fields:

     service: "ssh"


# - type: log

#   enabled: true

#   paths:

#     - /www/wwwlogs/yjvps.com-ssl_access_log

#   include_lines: ['sshd', 'sudo']

#   tags:

#     ["httpd-log"]

#   fields:

#     service: "httpd"


setup.template.settings:

  index.number_of_shards: 1

output.logstash:

  hosts: ["192.168.137.30:5043"]


其中 include_lines可以匹配相关行日志。


4,以下是logstash的ssh.conf 的相关配置文件内容:


[root@k8s-30 ~]# cat /etc/logstash/conf.d/ssh.conf

# Sample Logstash configuration for creating a simple

# Beats -> Logstash -> Elasticsearch pipeline.



input {

    beats {

        port => 5043

    }

}


filter {

    grok {

        match => {

            "message" => "%{MONTH:yuefen} %{MONTHDAY:riqi} %{TIME:shijian} %{HOSTNAME:zhoujiming} .* %{IPV4:ip} .*"

        }

        overwrite => ["message"]

    }

    mutate {

        remove_field => [ "@message","yuefen","riqi","shijian","@version","beat","hosts","input","log","offset","prospector" ]

    }

}


output {

    elasticsearch {

        hosts => ["http://localhost:9200"]

        index => "filebeat-ssh-ip"

        user => "elastic"

        password => "123456"

    }

}

     

"%{MONTH:yuefen} %{MONTHDAY:riqi} %{TIME:shijian} %{HOSTNAME:zhoujiming} .* %{IPV4:remoteip} .*"

可以过滤相关日志,也可以去掉不要的字段。

同时logstash传到ES的数据,所建索引名: filebeat-ssh-ip


5,相关软件启动后,可以到elastic中查看会多出一个filebeat-ssh-时间的索引并上传相关日志字段,

       但是需要将IP转换成geoip地理坐标相关信息,然后geoip的location字段是float,并不是geoip_point,所以kibana地图不能识别数据。

   (1) 这里,先创建一个采集节点管道,这里直接在kibana的Web界面上建立管道,非常简单,

    管道名称: geoip_pipeline  引用的字段名称:  ip (也就是logstash提取的ip字段),

如下图:


image.png


(2)然后geoip的location字段是float,并不是geoip_point,所以kibana地图不能识别数据。

这里可以采用索引模板来解决,可以通过kibana来创建一个新索引模板,

    模板名称:  ssh

    模板设置: 

"number_of_shards": "1",

"default_pipeline": "geoip_pipeline",

"number_of_replicas": "0"


    

如下图:

1) ,填写名称ssh,要匹配的索引模式: filebeat-ssh* ,再加优先级最高,这样就可以给filebeat-ssh-log索引使用了,

 image.png


2)组件模板可以不用填写,直接在索引设置中引用节点管道,根据自己情况填写索引设置相关参数。

image.png


3)映射 选项中 先 添加 geoip 字段,类型 :  对象 ,添加完成后,再点 对应 geoip 行的 + 符号,

      添加子字段 location ,类型:  地理坐标点 。 

      高级选项中的内容,不需要的可以关掉。

image.png


4)最后保存,模板就建立好了,最终应用模板内容如下:


{
  "template": {
    "settings": {
      "index": {
        "number_of_shards": "1",
        "number_of_replicas": "0",
        "default_pipeline": "geoip_pipeline"
      }
    },
    "mappings": {
      "dynamic": "true",
      "dynamic_templates": [
        
      ],
      "date_detection": false,
      "numeric_detection": false,
      "properties": {
        "geoip": {
          "properties": {
            "location": {
              "type": "geo_point"
            }
          }
        }
      }
    },
    "aliases": {
      
    }
  }
}


6)启动 filebeat,logstash,es,kibana后,可以选好索引模式,再到kibana的discaver中查看相关内容。


image.png


7,如上图,只有显示 一个 地球的图标,才可以正常实现IP地理可视化。

点地球图标,出现对话框,然后点   可视化,会自动转好地图图层。


image.png


8,最后根据自己情况调整地图,如下:


image.png




转载请标明出处【filebeat7.10+logstash7.10+Elastic7.10+kibana7.10收集centos系统ssh日志IP并将IP地图化展示】。

《www.micoder.cc》 虚拟化云计算,系统运维,安全技术服务.

网站已经关闭评论