来源:https://community.hortonworks.com/articles/30291/adding-new-telemetry-data-source-to-apache-metron.html
简短描述:
本教程将指导您如何收集/接收Metron,然后配置Metron来解析、索引、持久化和查看它们。
当在Metron中添加一个新的数据源时,第一步是决定如何将事件从新的遥测数据源推送到Metron。您可以使用许多数据收集工具,而这个决策与Metron是分离的。但是,我们推荐评估Apache Nifi,因为它是一个很好的工具(本文使用Nifi将数据推入Metron)。第二步是配置Metron来解析自动测试记录数据源,以便对其进行下游处理。在本文中,我们将介绍如何执行这两个步骤。
在本系列的前一篇文章中,我们描述了希望将Squid自动测试记录数据源添加到Metron的Foo客户的以下需求。
1、来自Squid日志的代理事件需要实时接收;
2、必须将代理日志解析为Metron可以理解的标准JSON结构;
3、实时地,需要对squid代理事件进行丰富,使所域名信息丰富IP信息;
4、在实时的情况下,在代理事件中的IP必须与intel威胁情报进行检查;
5、如果与有英特尔威胁情报匹配,需要提高警报;
6、最终用户必须能够看到新的自动测试记录事件和来自新数据源的警报。
7、所有这些需求都需要在不编写任何新的java代码的情况下轻松实现。
在本文中,我们将介绍如何执行步骤1、2和6。
一、如何将Squid自动测试记录数据源解析到Metron
以下步骤指导您如何添加这个新的自动测试记录技术。
步骤1:开启单个节点的 Vagrant VM
1、从https://github.com/apache/incubator-metron/archive/codelab-v1.0.tar.gz下载代码。
2、untar文件(tar -zxvf incubat -metron-codelab-v1.0.tar.gz)。
3、导航到metron-platform目录并构建包:incubator-metron-codelab-v1.0/metron-platform并构建它(mvn clean包-DskipTests=true)
4、导航到codelab-platform目录:incubat -metron-codelab-v1.0/metron-deployment/vagrant/codelab-platform/
5、按照这里的说明:https://github.com/apache/incubator-metron/tree/codelab-v1.0/metron-deployment/vagrant/codelab-platform。注意:Metron开发映像被命名为launch_imagesh,而不是launch_dev_image.sh。
步骤2:为新的数据源创建Kafka主题
您要以流方式输入到Metron的每个数据源都必须有自己的Kafka主题。选择的接收工具(例如Apache Nifi)将事件推入Kafka主题。
1、ssh到您的虚拟机
1、vagrant ssh
1、在目录/usr/hdp/current/kafka-broker/bin/中创建一个名为“squid”的Kafka主题:
2、列出所有Kafka主题,以确保新主题存在:
Kafka主题如下所示:
bro
enrichment
pcap
snort
squid
yaf
第3步:安装Squid
1、安装并启动Squid:
2、开始使用Squid时,查看创建的不同日志文件:
您可以看到有三种日志可用:access.log、cache.log和squid.out。我们对access.log感兴趣,它是记录代理使用情况的日志。
3、起初,access.log是空的。让我们为日志生成一些条目,然后列出access.log的新内容。“- h 127.0.0.1”表示squidclient只使用IPV4接口。
在生产环境中,您可以将用户web浏览器配置为指向代理服务器,但是为了简化本教程,我们将使用与Squid安装打包的客户机。在我们使用客户端模拟代理请求后,Squid日志条目应该如下所示:
4、使用Squid日志项目,我们可以确定日志的格式,即:
步骤4:创建一个Grok语句来解析squid自动测试记录事件
现在我们准备处理Metron解析拓扑设置。
1、我们需要做的第一件事是决定在新的自动测试记录中使用基于java的解析器还是基于grok的解析器。在本例中,我们将使用Grok解析器。Grok解析器非常适合于理解良好(检查)的结构化或半结构化日志和流量较小(检查)的自动测试记录日志。
2、接下来,我们需要定义日志的Grok表达式。有关更多细节,请参阅Grok文档。在我们的例子中,模式是:
在定义squid日志模式之前,请注意WDOM模式(更适合于Squid,而不是使用通用的Grok URL模式)。这是可选的,为便于使用而做。另外,请注意,我们将不希望包含在结果JSON结构中的任何消息的部分应用不需要的标记。最后,请注意,通过引用以下字段约定列表,我们将命名约定应用到IPV4字段。
3、我们需要做的最后一件事是验证Grok模式以确保它是有效的。对于我们的测试,我们将使用一个名为Grok构造函数的免费Grok验证器。验证过的Grok表达式应该如下:
4、既然已经定义了Grok模式,我们需要保存它并将它移动到HDFS。在tmp目录中创建一个名为“squid”的文件,并将Grok模式复制到该文件中。
5、现在将squid文件放入Metron存储Grok解析器的目录中。现有的带Metron的Grok解析器是在/apps/metron/patterns/下进行的。
步骤5:为新的Squid Storm解析器拓扑创建流量配置
1、既然Grok模式是在HDFS中分阶段的,我们需要为Metron解析拓扑定义Storm Flux配置。配置在/usr/metron/0.1BETA/config/topologies/下分阶段,每个解析拓扑都有自己的配置集。拓扑的每个目录都有一个remote.yaml,该remote.yaml设计为在AWS上运行,而“local/test.yaml”设计为在单节点VM上本地运行。由于我们将在VM上本地运行,所以需要为Squid定义test.yaml。最简单的方法是复制一个现有的基于grok的configs (YAF),并为squid定制它。
2、并编辑您的配置,看上去像这样(用squid代替yaf,并替换“constructorArgs”部分):
name: "squid"
config:
topology.workers: 1
components:
- id: "parser"
className: "org.apache.metron.parsers.GrokParser"
constructorArgs:
- "/apps/metron/patterns/squid"
- "SQUID_DELIMITED"
configMethods:
- name: "withTimestampField"
args:
- "timestamp"
- id: "writer"
className: "org.apache.metron.parsers.writer.KafkaWriter"
constructorArgs:
- "${kafka.broker}"
- id: "zkHosts"
className: "storm.kafka.ZkHosts"
constructorArgs:
- "${kafka.zk}"
- id: "kafkaConfig"
className: "storm.kafka.SpoutConfig"
constructorArgs:
# zookeeper hosts
- ref: "zkHosts"
# topic name
- "squid"
# zk root
- ""
# id
- "squid"
properties:
- name: "ignoreZkOffsets"
value: true
- name: "startOffsetTime"
value: -1
- name: "socketTimeoutMs"
value: 1000000
spouts:
- id: "kafkaSpout"
className: "storm.kafka.KafkaSpout"
constructorArgs:
- ref: "kafkaConfig"
bolts:
- id: "parserBolt"
className: "org.apache.metron.parsers.bolt.ParserBolt"
constructorArgs:
- "${kafka.zk}"
- "squid"
- ref: "parser"
- ref: "writer"
streams:
- name: "spout -> bolt"
from: "kafkaSpout"
to: "parserBolt"
grouping:
type: SHUFFLE
步骤6:部署新的解析器拓扑
现在我们已经定义了squid解析器拓扑,让我们将它部署到集群中。
1、部署新的squid paser拓扑:
2、如果你目前在storm中有四种拓扑,你需要杀死一个来让一个worker可以使用squid。要做到这一点,请从Storm UI中单击要在topology Summary部分中杀死的拓扑的名称,然后单击topology Actions下的kill。storm将杀死这个拓扑结构,并使一个worker为squid服务。
3、转到Storm UI,您现在应该看到新的“squid”拓扑,并确保拓扑没有错误。
3、这个squid处理器拓扑将从我们之前创建的squid Kafka主题中获取,然后使用我们前面定义的Grok模式,用Metron的Grok框架解析事件。解析的结果是一个标准的JSON Metron结构,然后被放到“丰富”Kafka主题中进行进一步处理。
但是access.log中的squid事件如何被放到“squid”Kafka主题中,比如解析器拓扑就可以解析它呢?我们将使用Apache Nifi实现这一点。
二、使用Apache Nifi将数据流到Metron
简单地说,NiFi的构建是为了自动化系统之间的数据流。因此,它是一个收集、摄取和推送数据到Metron的极好工具。下面关于如何安装配置和创建nifi流以将squid事件推入Metron的说明。
2.1安装、配置和启动Apache Nifi
下面展示了如何在VM上安装Nifi。以root为权限完成下列操作:
1、下载 Nifi:
2、编辑Nifi配置以更新Nifi web应用程序的端口:Nifi .web.http.port=8089
3、将Nifi安装为服务
4、启动Nifi服务
5、访问Nifi Web: http://node1:8089/ Nifi /
2.2创建一个Nifi流,将事件流传输到Metron
现在我们将创建一个流来捕获squid的事件并将它们推入metron
1、将处理器拖动到画布上(通过拖动处理器图标来完成)。第一个图标)
2、搜索尾文件处理器并选择Add.右键单击处理器并配置。在设置选项卡中,将名称更改为“Ingest Squid Events”
在属性中,配置如下所示:
3、在画布上拖动另一个处理器
4、搜索PutKafka并选择Add
5、右键单击处理器并配置。在设置中,将名称更改为“Stream to Metron”,单击复选框查看关系的失败和成功
6、在属性下,设置3个属性
经纪人:node1:6667
主题名称:squid
客户名称:nifi-squid
7、通过拖动箭头从Ingest Squid事件流到Metron创建一个连接
8、选择整个流程并单击play按钮(播放按钮)。你应该看到所有的处理器都是绿色的,如下所示:
9、使用squidclient生成一些数据(对大约20多个站点执行此操作)
10、您应该看到处理器上的指标被推入Metron
11、查看解析器拓扑的Storm UI,您应该会看到tuple出现
12、在大约5分钟后,您应该会看到一个新的 Elastic Search索引,名为squid_index*,在 Elastic Search管理UI中。
三、验证事件索引
按照惯例,将对新消息进行索引的索引称为squid_index_[timestamp],文档类型为squid_doc。
为了验证消息是否被正确索引,我们可以使用Elastic Search head插件。
1、安装的插件:
您应该看到消息:已安装mobz/elasticsearch-head/1.x 到 /usr/share/elasticsearch/plugins/head
2、浏览到 elastic head UI: http://node1:9200/_plugin/head/
3、单击浏览器选项卡,在左侧面板中选择squid doc,然后选择一个示例文档。您应该看到如下内容:
四、配置Metron UI以查看squid自动测量记录事件
现在我们已经配置了Metron来解析、索引和持久化自动测量记录事件,并且Nifi将数据推送到Metron,现在让我们在Metron UI中可视化这个流式自动测量记录数据。
1、转到Metron UI
2、添加新的固定查询
1、单击+添加新的固定查询
2、创建一个查询:_type: squid_doc
3、单击彩色圆圈图标,命名保存的查询并单击Pin。见下文
3、为squid事件添加一个新的直方图面板
1、单击add panel +图标
2、选择直方图面板类型
3、标题为“squid事件”
4、将时间字段更改为:时间戳
5、配置跨到12
6、在查询下拉菜单中选择“选择”,并只选择“squid事件”固定查询
7、单击Save,应该看到直方图中的数据