ELK+KAFKA

数据流向

在这里插入图片描述

在这里nginx是生产者 logstash是消费者

filebeat就是小型的收集日志工具

1、通过nginx安装filebeat

1.1、修改filebeat的配置文件 filebeat.yml添加源码 安装的收集日志的路径和错误日志的路径

/usr/local/nginx/logs/access.log
/usr/local/nginx/logs/error.log

1.2、添加启动kafka输出并指向kafka的集群

2、在logstash设置脚本用于监控kafka集群

2.1在将kafka集群收集到的日志文件转发到logstash的两个实例当中 然后通过kabana页面展示

步骤

先查看页面是否可以正常访问

重启一下logstash

查看 KAFKA 9092 端口

在这里插入图片描述

nginx安装 filebeat

在这里插入图片描述

修改配置文件

这是二进制源码包安装的路径

在这里插入图片描述

注释

在这里插入图片描述

指向KAFKA集群

在这里插入图片描述

回到logstash

创建一个脚本 指向KAFKA集群 KAFKA

topics

在这里插入图片描述

指向两台实例手机的机器

在这里插入图片描述

第二个为error

在这里插入图片描述

input {
    kafka {
        bootstrap_servers => "20.0.0.40:9092,20.0.0.41:9092,20.0.0.42:9092"
                #kafka集群地址
        topics  => "nginx"
                #拉取的kafka的指定topic
        type => "nginx_kafka"
                #指定 type 字段
        codec => "json"
                #解析json格式的日志数据
                auto_offset_reset => "latest"
                #拉取最近数据,earliest为从头开始拉取
                decorate_events => true
                #传递给elasticsearch的数据额外增加kafka的属性数据
    }
}
output {
  if "access" in [tags] {
    elasticsearch {
      hosts => ["20.0.0.13:9200","20.0.0.14:9200"]
      index => "nginx_access-%{+YYYY.MM.dd}"
    }
  }

  if "error" in [tags] {
    elasticsearch {
      hosts => ["20.0.0.13:9200","20.0.0.14:9200"]
      index => "nginx_error-%{+YYYY.MM.dd}"
    }
  }

}

在后台启动 Logstash,并使用 kafka.conf 作为配置文件,同时设置了一个特定的路径来处理日期相关的文件或数据

logstash -f kafka.conf --path.date /opt/test10 &

在这里插入图片描述

./filebeat -e -c filebeat.yml

在这里插入图片描述

页面访问nginx 返回刷新即可看到

在这里插入图片描述

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部