Graylog 是一个开源的日志管理和分析平台。它主要用于收集、存储、处理和分析大量的日志数据。Graylog 的核心功能包括:
- 日志收集:Graylog 可以从各种来源(如服务器、网络设备、应用程序等)收集日志数据。它支持多种输入方式,包括 Syslog、GELF、Beats、Kafka 等。
- 日志存储:Graylog 使用 Elasticsearch 作为其后端存储系统,因此它可以高效地存储和检索大量的日志数据。
- 日志处理和分析:Graylog 提供了强大的搜索和分析功能,用户可以通过查询语言对日志进行过滤、排序、聚合等操作,帮助发现问题或监控系统状态。
- 警报和通知:Graylog 可以根据预设条件触发警报,并通过电子邮件、Slack 等渠道发送通知。
- 可视化和仪表板:用户可以在 Graylog 中创建可视化的仪表板,用于实时监控和分析日志数据。
Graylog 常用于 IT 运维、安全监控、故障排查等场景,通过对日志数据的集中管理和分析,帮助企业提高系统的可观测性和问题解决能力。
一、日志收集
1.1、配置日志服务器
选择input
这里给了很多日志接收方法
试验使用kafka获取日志,
1.2、配置映射表
他作为动态数据获取,比如IP 地理位置等,根据日志某个字段的信息从XX库找到对应的数据
选择数据源
创建链接源数据实例
有很多不同格式的数据源
选择csv文件
如果我们的内容为
username ,department,email jdoe,IT,jdoe@example.com asmith,HR,asmith@example.com bwhite,Finance,bwhite@example.com johndoe,jiushi,a.com |
将内容编写为
不区分大小写,更新
创建索引表实例
选择刚刚创建的数据源实例并创建
1.3、创建字段提取器
创建,内容填写自己构建的kafka服务器地址和topic
保存成功,他会自己运行的,如果没有问题就会变成RUNNING的状态
先生产一个消息,因为要配置字段提取器
生产内容
{ "version": "1.1", "host": "webserver.local", "short_message": "User login failed", "full_message": "User johndoe failed to login to the web application", "level": 3, "_username": "johndoe", "_src_ip": "192.168.1.2"} |
查看消息
新产生的这个就是
构造字段提取
新增字段提取器
加载最近的消息
这里内容是kafka生产的消息,只作为日志,可以对每个字段配置提取。
将username 用于 Look UP配置
忘记设置映射表了。先在1,2小节创建映射表实例
选择映射表
1.4、创建告警
1.4.1、创建上报
使用简单的API上报
填充URL,并尝试发送测试数据
提示成功
查看api服务器
1.4.2、定义事件
设置上报方式,刚才创建的API方式
创建告警事件完成
1.5、测试
1.5.1、产生关于登录失败的日志
{ "version": "1.1", "host": "webserver.local", "short_message": "User login failed", "full_message": "User johndoe failed to login to the web application", "level": 3, "_username": "johndoe", "_src_ip": "192.168.1.2"} |
其中username的值为johndoe,记得之前映射表给定文件内容
我们创建映射表时,通过username查找department内容,并添加到告警自定义字段中。
产生的告警内容,我们自定义字段“renyuan”他的value通过映射表username关联到的。日志内容jdoe对应表格IT内容
附录
Input kafka不同格式示例
1. CEF Kafka
示例消息:
CEF:0|SecurityVendor|SecurityProduct|1.0|100|User Login|5|dvc=192.168.1.1 duser=johndoe src=10.0.0.1 spt=443
解释:
- CEF:0: CEF 版本号。
- SecurityVendor: 设备供应商名称。
- SecurityProduct: 设备产品名称。
- 1.0: 设备版本。
- 100: 事件 ID。
- User Login: 事件名称或描述。
- 5: 严重级别(1-10)。
- dvc=192.168.1.1: 设备 IP 地址。
- duser=johndoe: 目标用户名。
- src=10.0.0.1: 源 IP 地址。
- spt=443: 源端口。
2. GELF Kafka
示例消息:
{
"version": "1.1",
"host": "webserver.local",
"short_message": "User login failed",
"full_message": "User johndoe failed to login to the web application",
"level": 3,
"_username": "johndoe",
"_src_ip": "192.168.1.2"
}
解释:
- version: GELF 版本号。
- host: 主机名或 IP 地址。
- short_message: 简短的事件描述。
- full_message: 事件的详细描述。
- level: 日志级别(例如:3 表示错误)。
- _username 和 _src_ip: 自定义字段。
3. RAW Kafka
示例消息:
User johndoe failed to login from IP 192.168.1.2 via web application.
解释:
- RAW 格式是原始日志消息的简单文本,没有结构化的字段或元数据。消息内容完全取决于发送方。
4. Syslog Kafka
示例消息:
<134>webserver.local User login failed: johndoe from 192.168.1.2
解释:
- <134>: Syslog 优先级,计算方式为 (Facility * 8) + Severity。
- webserver.local: 发送日志的主机名。
- User login failed: johndoe from 192.168.1.2: 日志消息内容,描述发生的事件。
这些示例展示了不同格式在 Kafka 中传输的方式,供 Graylog 等日志管理系统解析和处理。
问题
1、接收到消息,但是没有输出
存在下行数据变动
查看系统日志tail -f /var/log/graylog-server/server.log &
root@a:/home/install/kafka_2.12-2.8.2# 2024-08-19T03:49:22.949Z ERROR [DecodingProcessor] Unable to decode raw message RawMessage{id=0593cd30-5dde-11ef-b23f-ac1f6b7e6aea, journalOffset=70, codec=CEF, payloadSize=9, timestamp=2024-08-19T03:49:22.947Z} on input <66c2a27b4c33336f079032fc>. 2024-08-19T03:49:22.949Z ERROR [DecodingProcessor] Error processing message RawMessage{id=0593cd30-5dde-11ef-b23f-ac1f6b7e6aea, journalOffset=70, codec=CEF, payloadSize=9, timestamp=2024-08-19T03:49:22.947Z} java.lang.NullPointerException: null at org.graylog.plugins.cef.parser.MappedMessage.<init>(MappedMessage.java:37) ~[graylog.jar:?] at org.graylog.plugins.cef.codec.CEFCodec.decodeCEF(CEFCodec.java:128) ~[graylog.jar:?] at org.graylog.plugins.cef.codec.CEFCodec.decode(CEFCodec.java:117) ~[graylog.jar:?] at org.graylog2.shared.buffers.processors.DecodingProcessor.processMessage(DecodingProcessor.java:149) ~[graylog.jar:?] at org.graylog2.shared.buffers.processors.DecodingProcessor.onEvent(DecodingProcessor.java:90) [graylog.jar:?] at org.graylog2.shared.buffers.processors.ProcessBufferProcessor.onEvent(ProcessBufferProcessor.java:90) [graylog.jar:?] at org.graylog2.shared.buffers.processors.ProcessBufferProcessor.onEvent(ProcessBufferProcessor.java:47) [graylog.jar:?] at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:143) [graylog.jar:?] at com.codahale.metrics.InstrumentedThreadFactory$InstrumentedRunnable.run(InstrumentedThreadFactory.java:66) [graylog.jar:?] at java.lang.Thread.run(Thread.java:830) [?:?] |
创建了CEF格式解码器,原因为kafka生产的消息不符合格式
示例CEF格式内容
CEF:0|Security|Graylog|1.0|100|Test event|5|src=10.0.0.1 dst=10.0.0.2 spt=1232
可以通过重新创建一个syslog
本站资源均来自互联网,仅供研究学习,禁止违法使用和商用,产生法律纠纷本站概不负责!如果侵犯了您的权益请与我们联系!
转载请注明出处: 免费源码网-免费的源码资源网站 » 日志审计Graylog 使用教程-kafka收取消息
发表评论 取消回复