架构: Filebeat收集---> kafka集群 缓存---> Logstash取出----> Elasticsearch存入----> Kibana展现 Kafka: Kafka依赖于Zookeeper 两个都依赖于Java 实验环境规划: 192.168.189.118 elk01 ------>nginx filebeat kafa zookeeper 192.168.189.119 elk02 ------>logstash kafa zookeeper 192.168.189.120 elk03 ------>elasticsearch kibana kafa zookeeper 硬件资源有限,所以kafka 集群 配置在 elk01--->elk02---->elk03 上面 nginx 日志要转成json格式 三台服务器都要安装java环境 yum install java-1.8.0-openjdk.x86_64 -y +++++ 一、安装elasticsearch kibana 192.168.189.120 elk03 mkdir -p /data/es_soft && cd /data/es_soft wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.6.0.rpm ##官网下载非常慢 wget http://www.xchinagroup.top/softdown/centos7/13_elk/elasticsearch-6.6.0.rpm rpm -ivh elasticsearch-6.6.0.rpm cp /etc/elasticsearch/elasticsearch.yml /etc/elasticsearch/elasticsearch.yml.bak vim /etc/elasticsearch/elasticsearch.yml node.name: node-1 path.data: /data/elasticsearch path.logs: /var/log/elasticsearch network.host: 192.168.189.120,127.0.0.1 http.port: 9200 保存,退出。 mkdir -p /data/elasticsearch chown -R elasticsearch:elasticsearch /data/elasticsearch systemctl daemon-reload systemctl enable elasticsearch systemctl start elasticsearch二、安装kibana ---->192.168.189.120 elk03 cd /data/es_soft/ wget http://www.xchinagroup.top/softdown/centos7/13_elk/kibana-6.6.0-x86_64.rpm rpm -ivh kibana-6.6.0-x86_64.rpm cp /etc/kibana/kibana.yml /etc/kibana/kibana.yml.bak vim /etc/kibana/kibana.yml server.port: 5601 server.host: "0.0.0.0" server.name: "elk03" elasticsearch.hosts: ["http://localhost:9200"] kibana.index: ".kibana" 保存退出。 systemctl enable kibana systemctl start kibana
+++++++++++++ 三、安装zookeeper集群: zookeeper集群特性:整个集群中只要有超过集群数量一半的zookeeper工作是正常的,那么整个集群对外就是可用的,例如有2台服务器做了一个zookeeper,只要有任何一台故障或宕机,那么这个zookeeper集群就是不可用的了.因为剩下的一台没有超过集群的一半的数量,但是假如有三台zookeeper组成一个集群,那么损坏一台还剩两台,大于3台的一半,所以损坏一台还是可以正常运行的,但是再损坏一台就只剩下一台,集群就不可用了. 如果是4台组成,损坏一台正常,损坏两台还剩两台,不满足集群总数的一半,所以3台的集群和4台的集群算坏两台的结果都是集群不可用.所以这也是为什么集群一般是奇数的原因. +++++++++ 3.1、先把三个节点的主机名分别加入到hosts文件中,并保证能互相ping通 3.2、192.168.189.118 elk01 安装zookeeper zookeeper下载地址 http://zookeeper.apache.org/releases.html https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8.tar.gz 此次实验用的版本是3.5.8 cd /root wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8.tar.gz tar zxf apache-zookeeper-3.5.8.tar.gz -C /opt ln -s /opt/apache-zookeeper-3.5.8/ /opt/zookeeper 创建zookeeper数据目录 mkdir -p /data/zookeeper cp /opt/zookeeper/conf/zoo_sample.cfg /opt/zookeeper/conf/zoo.cfg vim /opt/zookeeper/conf/zoo.cfg tickTime=2000 initLimit=10 syncLimit=5 dataDir=/data/zookeeper clientPort=2181 server.1=192.168.189.118:2888:3888 server.2=192.168.189.119:2888:3888 server.3=192.168.189.120:2888:3888 保存退出。 创建myid文件 myid是zookeeper集群用来发现彼此的标识,必须创建,且不能相同。 echo "1" > /data/zookeeper/myid 这里的1需要和你的配置文件中的server.后面的数字一样 +++++配置文件说明: 2888端口号是zookeeper服务之间通信的端口 3888端口是zookeeper与其他应用程序通信的端口。 tickTime:C(clinet)-S(server)通信心跳时间 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每tickTime 时间就会发送一个心跳。 tickTime以毫秒为单位。 tickTime:该参数用来定义心跳的间隔时间,zookeeper的客户端和服务端之间也有和web开发里类似的session的概念,而zookeeper里最小的session过期时间就是tickTime的两倍。 initLimit:L(leader)-F(follower)初始通信时限 集群中的follower服务器(F)与leader服务器(L)之间 初始连接 时能容忍的最多心跳数(tickTime的数量) syncLimit:L(leader)-F(follower)初始通信时限 集群中的follower服务器(F)与leader服务器(L)之间 请求和应答 之间能容忍的最多心跳数(tickTime的数量) ++++++++ 3.3、另外两个节点安装zookeeper 189.119 elk02 189.120 elk03 安装zookeeper,步骤与elk01一样,只是myid不一样而已。 189.119 elk02----> myid 2 189.120 elk03----> myid 3 各节点启动zookeeper /opt/zookeeper/bin/zkServer.sh start 查看各节点状态:/opt/zookeeper/bin/zkServer.sh status
![]()
![]()
查看各节点zookeeper日志:
![]()
![]()
排错: 登录zookeeper网站 https://zookeeper.apache.org/releases.html 查找文档
![]()
从3.5.5版本开始,带有bin名称的包才是下载后可以直接使用的,里面有编译后的二进制的包,而之前的普通的tar.gz的包里面是只是源码的包无法直接使用。 3.4、重新下载,并安装。 ++++++++189.118 189.119 189.120 操作步骤一样 cd /root cp /opt/zookeeper/conf/zoo.cfg . rm -rf /opt/zookeeper rm -rf /opt/apache-zookeeper-3.5.8 wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8-bin.tar.gz tar zxf apache-zookeeper-3.5.8-bin.tar.gz -C /opt/ ln -s /opt/apache-zookeeper-3.5.8-bin/ /opt/zookeeper cp /root/zoo.cfg /opt/zookeeper/conf/ +++++++++ 再次启动三个节点: /opt/zookeeper/bin/zkServer.sh start 查看各节点状态: /opt/zookeeper/bin/zkServer.sh status
zookeeper简单操作命令 连接到任意节点生成数据: 我们在节点1生成数据,然后在其他节点验证数据 /opt/zookeeper/bin/zkCli.sh -server elk02:2181
/opt/zookeeper/bin/zkCli.sh -server elk01:2181
/opt/zookeeper/bin/zkCli.sh -server elk03:2181
quit 或者 crtl + d 退出。 四、安装并测试kafka(三个节点上都要安装) kafka下载地址 http://kafka.apache.org/downloads.html cd /root && wget https://mirror.bit.edu.cn/apache/kafka/2.5.1/kafka_2.12-2.5.1.tgz tar zxf kafka_2.12-2.5.1.tgz -C /opt/ ln -s /opt/kafka_2.12-2.5.1/ /opt/kafka mkdir /opt/kafka/logs cp /opt/kafka/config/server.properties /opt/kafka/config/server.properties.bak vim /opt/kafka/config/server.properties broker.id=1 listeners=PLAINTEXT://192.168.189.118:9092 log.dirs=/opt/kafka/logs log.retention.hours=24 zookeeper.connect=192.168.189.118:2181,192.168.189.119:2181,192.168.189.120:2181
![]()
![]()
保存,退出。 各节点启动kafka节点1,可以先前台启动,方便查看错误日志 /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
![]()
![]()
最后一行出现KafkaServer id和started字样,就表明启动成功了,然后就可以放到后台启动了 /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties tail -f /opt/kafka/logs/server.log ##查看日志
![]()
+++++++ 测试创建topic 创建名为kafkatest,partitions(分区)为3,replication(复制)为3的topic(主题),在任意机器操作即可 /opt/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.189.118:2181,192.168.189.119:2181,192.168.189.120:2181 --partitions 3 --replication-factor 3 --topic kafkatest
测试获取toppid 可以在任意一台kafka服务器进行测试 节点1 /opt/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.189.118:2181,192.168.189.119:2181,192.168.189.120:2181 --topic kafkatest
![]()
![]()
状态说明:kafkatest有三个分区分别为1、2、3,分区0的leader是1(broker.id),分区0有三个副本,并且状态都为lsr(ln-sync,表示可以参加选举成为leader)。 +++++++++ 测试删除topic,任意一节点操作 /opt/kafka/bin/kafka-topics.sh --delete --zookeeper 192.168.189.118:2181,192.168.189.119:2181,192.168.189.120:2181 --topic kafkatest Topic kafkatest is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true. 验证是否真的删除,任意一节点操作 /opt/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.189.118:2181,192.168.189.119:2181,192.168.189.120:2181 --topic kafkatest
测试获取所有的topic列表,任意一节点都可操作 首先创建两个topic,节点1 /opt/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.189.118:2181,192.168.189.119:2181,192.168.189.120:2181 --partitions 3 --replication-factor 3 --topic kafkat1 /opt/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.189.118:2181,192.168.189.119:2181,192.168.189.120:2181 --partitions 3 --replication-factor 3 --topic kafkat2 然后查看所有的topic列表,节点3 /opt/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.189.118:2181, 192.168.189.119:2181, 192.168.189.120:2181
kafka测试命令发送消息 创建一个名为messagetest的topic,任意一个节点都可操作。 创建主题: /opt/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.189.118:2181,192.168.189.119:2181,192.168.189.120:2181 --partitions 3 --replication-factor 3 --topic messagetest 发送消息:注意,端口是 kafka的9092,而不是zookeeper的2181 节点一操作: 创建生产者 /opt/kafka/bin/kafka-console-producer.sh --broker-list 192.168.189.118:9092,192.168.189.119:9092,192.168.189.120:9092 --topic messagetest 其他kafka服务器获取消息 创建消费者 /opt/kafka/bin/kafka-console-consumer.sh --zookeeper 192.168.189.118:2181,192.168.189.119:2181,192.168.189.120:2181 --topic messagetest --from-beginning ###kafka_2.11-1.0.0的版本可以用 /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.189.118:9092,192.168.189.119:9092,192.168.189.120:9092 --topic messagetest --from-beginning ##kafka_2.5.0以上 版本都可以用 节点1上面,输入任意消息按回车,查看节点2(189.119),节点3(189.120) 上面能否显示消息。 节点1----->189.118,退出ctrl + d
节点2----->189.119,退出ctrl + c
节点3------>189.120,退出ctrl + c
五、安装logstash elk02---->189.119 安装logstash mkdir -p /data/es_soft && cd /data/es_soft wget https://artifacts.elastic.co/downloads/elasticsearch/logstash-6.6.0.rpm ##官网下载非常慢 wget http://www.xchinagroup.top/softdown/centos7/13_elk/logstash-6.6.0.rpm rpm -ivh logstash-6.6.0.rpm ln -s /usr/share/logstash/bin/logstash /usr/local/bin/logstash 先不要启动 ++++++++++++++ 六、修改filebeat的配置文件 189.118 elk01 上面操作 vim /etc/filebeat/filebeat.yml filebeat.inputs: - type: log enabled: true paths: - /var/log/nginx/access.log json.keys_under_root: true json.overwrite_keys: true tags: ["access"] - type: log enabled: true paths: - /var/log/nginx/error.log tags: ["error"] ########## output output.kafka: hosts: ["192.168.189.118:9092","192.168.189.119:9092","192.168.189.120:9092"] topic: elklog 保存退出,重启filebeat systemctl restart filebeat 七、编辑logstash的配置文件 189.119 elk02 上面: vim /etc/logstash/conf.d/kafka.conf input { kafka { bootstrap_servers => "192.168.189.118:9092" topics => ["elklog"] group_id => "logstash" codec => "json" } } filter { mutate { convert => ["upstream_time","float"] convert => ["request_time","float"] } } output { if "access" in [tags] { elasticsearch { hosts => "http://192.168.189.120:9200" manage_template => false index => "nginx_acc_kafka-%{+yyyy.MM}" } } if "error" in [tags] { elasticsearch { hosts => "http://192.168.189.120:9200" manage_template => false index => "nginx_err_kafka-%{+yyyy.MM}" } } } 保存,退出。启动logstash logstash -f /etc/logstash/conf.d/kafka.conf
八、验证 先模拟一些访问数据 ab -n 10 -c 10 http://192.168.189.118/xyz.html ##--->elk02 ab -n 10 -c 10 http://192.168.189.118/ ##--->elk01 ab -n 10 -c 10 http://192.168.189.118/www.html ##--->elk03 ES 上面,查看有无索引数据
登录Kibana 创建索引数据
![]()
![]()
![]()
![]()
同样的方法,创建 nginx_err_kafka-2020.10 索引数据: kibana 查看数据:
添加要显示的字段
保存,这个查询面板,下次,可以直接打开。
![]()
![]()
![]()
保存,第二个查看面板时,要开启,另存为,再输入名称,进行保存。如果直接修改名称保存,会把刚才保存的覆盖了。

查看各节点zookeeper日志:
排错:
登录zookeeper网站



最后一行出现

+++++++
测试创建topic
创建名为kafkatest,partitions(分区)为3,replication(复制)为3的topic(主题),在任意机器操作即可




















