16 Filebeat 和Logstash 之间引入Kafka

架构:
Filebeat ----> Kafka----> Logstash(正则) -----> Elasticsearch(入库) -> Kibana展现
部署服务介绍
  192.168.189.83: Kibana、ES
  192.168.189.84: Logstash
  192.168.189.73: Filebeat Nginx
  192.168.189.92: Kafka
一、Filebeat 收集的数据写入到Kafka

修改filebeat 配置文件:(在189.73上操作)
cd /usr/local/filebeat-6.6.0 && cp filebeat.yml filebeat.yml.20190613
  vim filebeat.yml
filebeat.inputs:
- type: log
  tail_files: true
  backoff: "1s"
  paths:
     - /usr/local/nginx/logs/access.json.log
  fields:
    type: access
  fields_under_root: true
output:
  kafka:
    hosts: ["192.168.189.92:9092"]
    topic: web01



保存退出。重启filebeat
pkill filebeat
nohup /usr/local/filebeat-6.6.0/filebeat -e -c /usr/local/filebeat-6.6.0/filebeat.yml >/tmp/filebeat.log 2>&1 &

二、修改Logstash配置文件 (在192.168.189.84 上操作)
 cd /usr/local/logstash-6.6.0/config && cp logstash.conf logstash.conf.20190613
vim logstash.conf
input {
  kafka {
  bootstrap_servers => "192.168.189.92:9092"
  topics => ["web01"]
  group_id => "web01"
  codec => "json"
  }
 }
filter {
 if [type] == "access" {
   json {
   source => "message"
   remove_field => ["message","@version","path","beat","input","log","offset","prospector","source","tags"]
    }
  }
}
output{
  if [type] == "access" {
    if "127.0.0.1" not in [clientip] {
    elasticsearch {
    hosts => ["http://192.168.189.83:9200"]
    index => "web01-%{+YYYY.MM.dd}"
    }
   }
 }
}

保存退出。重启logstash

在kibana上清空索引,保证使用最新的日志。







在其他电脑上,访问192.168.189.73,在Kibana上查看数据索引



创建索引:

  





++++++++++++++++++







+++++++++++++++++++++++++++++++++++++++++++++++

Kafka查看队列信息 (在192.168.189.92上操作)

cd /usr/local/kafka_2.12/bin
查看Group:
./kafka-consumer-groups.sh --bootstrap-server 192.168.189.92:9092 --list



查看队列:
./kafka-consumer-groups.sh --bootstrap-server 192.168.189.92:9092 --group web01 --describe