ELK_14 ELK 使用kafka集群缓存日志

架构:
Filebeat收集---> kafka集群 缓存---> Logstash取出----> Elasticsearch存入----> Kibana展现

Kafka:
Kafka依赖于Zookeeper
两个都依赖于Java
实验环境规划:
192.168.189.118 elk01 ------>nginx filebeat kafa zookeeper
192.168.189.119 elk02 ------>logstash kafa zookeeper
192.168.189.120 elk03 ------>elasticsearch kibana kafa zookeeper
硬件资源有限,所以kafka 集群 配置在 elk01--->elk02---->elk03 上面

nginx 日志要转成json格式
三台服务器都要安装java环境
yum install java-1.8.0-openjdk.x86_64 -y
+++++
一、安装elasticsearch kibana
192.168.189.120 elk03
mkdir -p /data/es_soft && cd /data/es_soft
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.6.0.rpm
##官网下载非常慢
wget http://www.xchinagroup.top/softdown/centos7/13_elk/elasticsearch-6.6.0.rpm

rpm -ivh elasticsearch-6.6.0.rpm
cp /etc/elasticsearch/elasticsearch.yml /etc/elasticsearch/elasticsearch.yml.bak
vim /etc/elasticsearch/elasticsearch.yml
node.name: node-1
path.data: /data/elasticsearch
path.logs: /var/log/elasticsearch
network.host: 192.168.189.120,127.0.0.1
http.port: 9200
保存,退出。
mkdir -p /data/elasticsearch
chown -R elasticsearch:elasticsearch /data/elasticsearch
systemctl daemon-reload
systemctl enable elasticsearch
systemctl start elasticsearch


二、安装kibana ---->192.168.189.120 elk03
cd /data/es_soft/
wget http://www.xchinagroup.top/softdown/centos7/13_elk/kibana-6.6.0-x86_64.rpm
rpm -ivh kibana-6.6.0-x86_64.rpm
cp /etc/kibana/kibana.yml /etc/kibana/kibana.yml.bak

vim /etc/kibana/kibana.yml
server.port: 5601
server.host: "0.0.0.0"
server.name: "elk03"
elasticsearch.hosts: ["http://localhost:9200"]
kibana.index: ".kibana"

保存退出。
systemctl enable kibana
systemctl start kibana

+++++++++++++
三、安装zookeeper集群:
zookeeper集群特性:整个集群中只要有超过集群数量一半的zookeeper工作是正常的,那么整个集群对外就是可用的,例如有2台服务器做了一个zookeeper,只要有任何一台故障或宕机,那么这个zookeeper集群就是不可用的了.因为剩下的一台没有超过集群的一半的数量,但是假如有三台zookeeper组成一个集群,那么损坏一台还剩两台,大于3台的一半,所以损坏一台还是可以正常运行的,但是再损坏一台就只剩下一台,集群就不可用了.

如果是4台组成,损坏一台正常,损坏两台还剩两台,不满足集群总数的一半,所以3台的集群和4台的集群算坏两台的结果都是集群不可用.所以这也是为什么集群一般是奇数的原因.

+++++++++
3.1、先把三个节点的主机名分别加入到hosts文件中,并保证能互相ping通

3.2、192.168.189.118 elk01 安装zookeeper
zookeeper下载地址
http://zookeeper.apache.org/releases.html
https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8.tar.gz

此次实验用的版本是3.5.8
cd /root
wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8.tar.gz
tar zxf apache-zookeeper-3.5.8.tar.gz -C /opt
ln -s /opt/apache-zookeeper-3.5.8/ /opt/zookeeper

创建zookeeper数据目录
mkdir -p /data/zookeeper
cp /opt/zookeeper/conf/zoo_sample.cfg /opt/zookeeper/conf/zoo.cfg

vim /opt/zookeeper/conf/zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper
clientPort=2181
server.1=192.168.189.118:2888:3888
server.2=192.168.189.119:2888:3888
server.3=192.168.189.120:2888:3888
保存退出。

创建myid文件
myidzookeeper集群用来发现彼此的标识,必须创建,且不能相同。

echo "1" > /data/zookeeper/myid
这里的1需要和你的配置文件中的server.后面的数字一样
+++++配置文件说明:
2888端口号是zookeeper服务之间通信的端口
3888端口是zookeeper与其他应用程序通信的端口。
tickTime:C(clinet)-S(server)通信心跳时间
Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每tickTime 时间就会发送一个心跳。

tickTime以毫秒为单位。
tickTime:该参数用来定义心跳的间隔时间,zookeeper的客户端和服务端之间也有和web开发里类似的session的概念,而zookeeper里最小的session过期时间就是tickTime的两倍initLimit:L(leader)-F(follower)初始通信时限
集群中的follower服务器(F)与leader服务器(L)之间 初始连接 时能容忍的最多心跳数(tickTime的数量)
syncLimit:L(leader)-F(follower)初始通信时限
集群中的follower服务器(F)与leader服务器(L)之间 请求和应答 之间能容忍的最多心跳数(tickTime的数量)
++++++++
3.3、另外两个节点安装zookeeper
189.119 elk02 189.120 elk03 安装zookeeper,步骤与elk01一样,只是myid不一样而已。

189.119 elk02----> myid 2
189.120 elk03----> myid 3

各节点启动zookeeper
/opt/zookeeper/bin/zkServer.sh start

查看各节点状态:/opt/zookeeper/bin/zkServer.sh status






查看各节点zookeeper日志:






排错:
登录zookeeper网站 https://zookeeper.apache.org/releases.html 

查找文档




从3.5.5版本开始,带有bin名称的包才是下载后可以直接使用的,里面有编译后的二进制的包,而之前的普通的tar.gz的包里面是只是源码的包无法直接使用。

3.4、重新下载,并安装。
++++++++189.118 189.119 189.120 操作步骤一样
cd /root
cp /opt/zookeeper/conf/zoo.cfg .
rm -rf /opt/zookeeper
rm -rf /opt/apache-zookeeper-3.5.8

wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8-bin.tar.gz
tar zxf apache-zookeeper-3.5.8-bin.tar.gz -C /opt/
ln -s /opt/apache-zookeeper-3.5.8-bin/ /opt/zookeeper
cp /root/zoo.cfg /opt/zookeeper/conf/
+++++++++
再次启动三个节点:
/opt/zookeeper/bin/zkServer.sh start
查看各节点状态:
/opt/zookeeper/bin/zkServer.sh status


zookeeper简单操作命令
连接到任意节点生成数据:
我们在节点1生成数据,然后在其他节点验证数据
/opt/zookeeper/bin/zkCli.sh -server elk02:2181



/opt/zookeeper/bin/zkCli.sh -server elk01:2181



/opt/zookeeper/bin/zkCli.sh -server elk03:2181



quit 或者 crtl + d 退出。

四、安装并测试kafka(三个节点上都要安装)
kafka下载地址 http://kafka.apache.org/downloads.html

cd /root && wget https://mirror.bit.edu.cn/apache/kafka/2.5.1/kafka_2.12-2.5.1.tgz

tar zxf kafka_2.12-2.5.1.tgz -C /opt/
ln -s /opt/kafka_2.12-2.5.1/ /opt/kafka
mkdir /opt/kafka/logs
cp /opt/kafka/config/server.properties /opt/kafka/config/server.properties.bak

vim /opt/kafka/config/server.properties
broker.id=1
listeners=PLAINTEXT://192.168.189.118:9092
log.dirs=/opt/kafka/logs
log.retention.hours=24
zookeeper.connect=192.168.189.118:2181,192.168.189.119:2181,192.168.189.120:2181






保存,退出。
各节点启动kafka节点1,可以先前台启动,方便查看错误日志
/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties







最后一行出现KafkaServer idstarted字样,就表明启动成功了,然后就可以放到后台启动了
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
tail -f /opt/kafka/logs/server.log ##查看日志






+++++++
测试创建topic
创建名为kafkatest,partitions(分区)为3,replication(复制)为3的topic(主题),在任意机器操作即可
/opt/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.189.118:2181,192.168.189.119:2181,192.168.189.120:2181 --partitions 3 --replication-factor 3 --topic kafkatest


测试获取toppid
可以在任意一台kafka服务器进行测试
节点1
/opt/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.189.118:2181,192.168.189.119:2181,192.168.189.120:2181 --topic kafkatest







状态说明:kafkatest有三个分区分别为1、2、3,分区0的leader是1(broker.id),分区0有三个副本,并且状态都为lsr(ln-sync,表示可以参加选举成为leader)。

+++++++++
测试删除topic,任意一节点操作
/opt/kafka/bin/kafka-topics.sh --delete --zookeeper 192.168.189.118:2181,192.168.189.119:2181,192.168.189.120:2181 --topic kafkatest

Topic kafkatest is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
验证是否真的删除,任意一节点操作
/opt/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.189.118:2181,192.168.189.119:2181,192.168.189.120:2181 --topic kafkatest



测试获取所有的topic列表,任意一节点都可操作

首先创建两个topic,节点1
/opt/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.189.118:2181,192.168.189.119:2181,192.168.189.120:2181 --partitions 3 --replication-factor 3 --topic kafkat1

/opt/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.189.118:2181,192.168.189.119:2181,192.168.189.120:2181 --partitions 3 --replication-factor 3 --topic kafkat2

然后查看所有的topic列表,节点3
/opt/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.189.118:2181, 192.168.189.119:2181, 192.168.189.120:2181



kafka测试命令发送消息
创建一个名为messagetest的topic,任意一个节点都可操作。

创建主题:
/opt/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.189.118:2181,192.168.189.119:2181,192.168.189.120:2181 --partitions 3 --replication-factor 3 --topic messagetest

发送消息:注意,端口是 kafka的9092,而不是zookeeper的2181
节点一操作:
创建生产者
/opt/kafka/bin/kafka-console-producer.sh --broker-list 192.168.189.118:9092,192.168.189.119:9092,192.168.189.120:9092 --topic messagetest

其他kafka服务器获取消息
创建消费者
/opt/kafka/bin/kafka-console-consumer.sh --zookeeper 192.168.189.118:2181,192.168.189.119:2181,192.168.189.120:2181 --topic messagetest --from-beginning ###kafka_2.11-1.0.0的版本可以用

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.189.118:9092,192.168.189.119:9092,192.168.189.120:9092 --topic messagetest --from-beginning ##kafka_2.5.0以上 版本都可以用

节点1上面,输入任意消息按回车,查看节点2(189.119),节点3(189.120) 上面能否显示消息。
节点1----->189.118,退出ctrl + d



节点2----->189.119,退出ctrl + c



节点3------>189.120,退出ctrl + c


五、安装logstash
elk02---->189.119 安装logstash
mkdir -p /data/es_soft && cd /data/es_soft
wget https://artifacts.elastic.co/downloads/elasticsearch/logstash-6.6.0.rpm
##官网下载非常慢
wget http://www.xchinagroup.top/softdown/centos7/13_elk/logstash-6.6.0.rpm
rpm -ivh logstash-6.6.0.rpm
ln -s /usr/share/logstash/bin/logstash /usr/local/bin/logstash

先不要启动
++++++++++++++
六、修改filebeat的配置文件
189.118 elk01 上面操作
vim /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/nginx/access.log
  json.keys_under_root: true
  json.overwrite_keys: true
  tags: ["access"]
- type: log
  enabled: true
  paths:
    - /var/log/nginx/error.log
  tags: ["error"]
########## output
output.kafka:
  hosts: ["192.168.189.118:9092","192.168.189.119:9092","192.168.189.120:9092"]
  topic: elklog

保存退出,重启filebeat systemctl restart filebeat

七、编辑logstash的配置文件
189.119 elk02 上面:
vim /etc/logstash/conf.d/kafka.conf
input {
  kafka {
    bootstrap_servers => "192.168.189.118:9092"
    topics => ["elklog"]
    group_id => "logstash"
    codec => "json"
  }
}
filter {
  mutate {
    convert => ["upstream_time","float"]
    convert => ["request_time","float"]
  }
}
output {
  if "access" in [tags] {
  elasticsearch {
    hosts => "http://192.168.189.120:9200"
    manage_template => false
    index => "nginx_acc_kafka-%{+yyyy.MM}"
  }
}
  if "error" in [tags] {
    elasticsearch {
      hosts => "http://192.168.189.120:9200"
      manage_template => false
      index => "nginx_err_kafka-%{+yyyy.MM}"
    }
  }
}

保存,退出。启动logstash
logstash -f /etc/logstash/conf.d/kafka.conf


八、验证
先模拟一些访问数据
ab -n 10 -c 10 http://192.168.189.118/xyz.html ##--->elk02
ab -n 10 -c 10 http://192.168.189.118/ ##--->elk01
ab -n 10 -c 10 http://192.168.189.118/www.html ##--->elk03

ES 上面,查看有无索引数据


登录Kibana 创建索引数据











同样的方法,创建 nginx_err_kafka-2020.10 索引数据:

kibana 查看数据:


添加要显示的字段



保存,这个查询面板,下次,可以直接打开。










保存,第二个查看面板时,要开启,另存为,再输入名称,进行保存。如果直接修改名称保存,会把刚才保存的覆盖了。