简介

我们在使用ES的时候,经常的用法就是把其他数据源比如Mysql的数据灌到ES中。
借用ES的一些功能来提供数据的全文检索以及聚合分析之类的功能。
在这个灌数据的过程中,我们经常会对数据做一些治理,类似ETL的能力。然后把治理后的数据写入ES中。
我们当然可以自己监听MQ的数据在程序中编写代码来实现这个过程。
也可以使用logstash等中间件来实现这个功能。
这些都可以。但是ES中本身也是提供了一个能力来实现这个功能的。
而且基于这个能力你甚至可以扩展出其他的操作。
这个能力就是Ingest pipelines(摄取管道)。

这个系列我们就来看一下摄取管道的能力,并且我会分三个章节来完成。这个章节我们只来看一些他的基本用法,后面我们会依次讲解 enrich processor和自己开发插件实现管道。

一、什么是Ingest pipelines(摄取管道)

我们先来看官方文档的一些信息。本文使用的ES版本为7.17.7。官方文档的位置位于Ingest pipelines官方文档

# 英文:
Ingest pipelines let you perform common transformations on your data before indexing.
For example, you can use pipelines to remove fields, extract values from text, and enrich your data.

A pipeline consists of a series of configurable tasks called processors. 
Each processor runs sequentially, making specific changes to incoming documents. 
After the processors have run, Elasticsearch adds the transformed documents to your data stream or index.

# 中文:
采集管道可让您在索引之前对数据执行常见的转换。例如,您可以使用管道删除字段、从文本中提取值以及丰富数据。
管道由一系列可配置的任务(称为处理器)组成。每个处理器按顺序运行,对传入的文档进行特定更改。
处理器运行后,Elasticsearch 会将转换后的文档添加到您的数据流或索引中。

而且官方还有一个图示:
在这里插入图片描述
我们从描述和图示可以看到他很像我们在设计模式中说的责任链设计模式,把一个数据从源端拿到之后,在写入目标端的索引之前,经过一系列的processor处理器,最终得到目标数据,写入ES索引中。
实际上他也就是这个功能。

而且你可以使用 Kibana 的摄取管道功能或摄取 API 创建和管理摄取管道。 Elasticsearch 将管道存储在集群状态中。我们说,基本上ES中的一些操作你都可以通过kibana界面或者是DSL命令来实现。他们是等价的,只是一个有界面点点点,一个是写代码。作为一个开发,我们当然要用命令这种"高大上"的东西来实现了,界面点点点还是让那些小白去点吧,没有任何逼格。
而且命令的官方地址位于Ingest pipelines操作命令
并且我们说这个管道你建立之后,如果你想查看,你可以去集群状态中去查看,具体的操作步骤位于ES查看集群状态

OK,至此我们已经知道了他的一些概念,以及如何去在文档中定位他。下面我们就来看看如何使用。

二、Ingest pipelines基本步骤

1、节点角色

这个后面我会单独写一篇文章来说这个事,这里先提一下,如果你想使用Ingest pipelines功能。要求你的节点角色必须是Ingest,我们这次不纠结这个,因为所有的节点启动默认就有这个角色。我们这次的重点不在节点角色上。
所以我们这个问题目前不存在。

2、操作步骤

我们先来看看他的操作流程,然后我们再探究那些深入的。

2.1、创建一个索引,写入数据

我们先创建一个索引用于操作。

# 我们创建一个索引,他有两个字段,一个是text类型的message,一个是类似数组类型的tags,
# 这里之所以说类似数组,是因为ES没有专门的数组类型,这个我们后面说。
PUT my-index-01
{
  "settings": {
    "number_of_replicas": 0 # 我是单节点,设置副本为0,
  }, 
  "mappings": {
    "properties": {
      "message":{
        "type": "text"
      },
      "tags":{
        "type": "keyword"
      }
    }
  }
}

# 然后往里写两条数据
PUT my-index-01/_doc/1
{
  "message": "手机",
  "tags":  [ "xiao mi ", " hua wei" ]
}

PUT my-index-01/_doc/2
{
  "message": "语言",
  "tags":  [ "java ", " python" ]
}

# 查到此时的数据为,我们看到数据没毛病
GET my-index-01/_search
{
  "_index" : "my-index-01",
  "_type" : "_doc",
  "_id" : "1",
  "_score" : 1.0,
  "_source" : {
    "message" : "手机",
    "tags" : [
      "xiao mi ",
      " hua wei"
    ]
  }
},
{
  "_index" : "my-index-01",
  "_type" : "_doc",
  "_id" : "2",
  "_score" : 1.0,
  "_source" : {
    "message" : "语言",
    "tags" : [
      "java ",
      " python"
    ]
  }
}

但是我们发现,我写进去的数据的tags字段,有的是后面有空格,有的是前面有空格,反正就是对不上。所以我们要把他处理掉。

2.2、创建摄取管道


PUT _ingest/pipeline/my-pipeline-01
{
  "description": "我的第一个pipeline",
  "processors": [
    {
      "foreach": {
        "field": "tags",
        "processor": {
          "trim": {
            "field": "_ingest._value"
          }
        }
      }
    }
  ]
}

我们看到我创建了一个摄取管道,其中processors部分可以传入多个processor,如同我们前面说的责任链。这里每一个processor都是责任链中的一个处理节点。
我们加入了第一个processor叫做foreach,用来遍历我们的每一条数据的每一个tags字段,然后他又嵌套了一个processor叫做trim,就是对于每个tags的遍历的属性都做去除前后空格的操作,trim在java也常见。而_ingest._value是固定写法,表示的就是每一个tags中的元素。取出来做trim。
这就是一个摄取管道的最基本的要素。
这个DSL如果你觉得抽象,我可以用java伪代码来表示一下。其实就干了这么点事。

public static void main(String[] args) {
    List<MyIndex01> list = Arrays.asList(
            new MyIndex01("手机", Arrays.asList("xiao mi ", " hua wei")), 
            new MyIndex01("语言", Arrays.asList("java ", " python"))
    );
    for (MyIndex01 myIndex01 : list) {
        List<String> tags = new ArrayList<>();
        for (String tag : myIndex01.getTags()) {
            tags.add(tag.trim());
        }
        myIndex01.setTags(tags);
    }
}

@Data
@AllArgsConstructor
@NoArgsConstructor
static class MyIndex01 {
    private String message;
    private List<String> tags;

}

2.2、创建一个目标索引

我们创建目标索引用来清洗我们的数据之后存进去。其实和源索引一样。

PUT my-index-01_target
{
  "settings": {
    "number_of_replicas": 0
  }, 
  "mappings": {
    "properties": {
      "message":{
        "type": "text"
      },
      "tags":{
        "type": "keyword"
      }
    }
  }
}

2.3、pipeline+reindex清洗数据

下面我们就执行reindex把我们源端索引的带着空格的不合理的数据,通过摄取管道,转换到目标索引中。

POST _reindex
{
  "source": { 
    "index": "my-index-01"
  },
  "dest": {
    "index": "my-index-01_target",
    "pipeline": "my-pipeline-01"  # 在这里指定我们的摄取管道。
  }
}

执行之后,我们可以来查看一下我们清洗之后写入目标索引my-index-01_target的数据。

GET my-index-01_target/_search
此时我们就看到,tags里面数据的前后的空格都被去掉了。
{
  "_index" : "my-index-01_target",
  "_type" : "_doc",
  "_id" : "1",
  "_score" : 1.0,
  "_source" : {
    "message" : "手机",
    "tags" : [
      "xiao mi",
      "hua wei"
    ]
  }
},
{
  "_index" : "my-index-01_target",
  "_type" : "_doc",
  "_id" : "2",
  "_score" : 1.0,
  "_source" : {
    "message" : "语言",
    "tags" : [
      "java",
      "python"
    ]
  }
}

OK,基于以上你大概知道了摄取管道如何使用,但是这只是他的一种使用方式,就是pipeline+reindex,可以从给reindex打配合,让数据从一个索引经过摄取管道清洗,然后转移到另一个索引。

2.4、直接写入

还记得我们一开始说的那个话题吗,我们在写入数据的时候,就想清洗之后再写入,而不是写完了再转移。这时候pipeline也可以使用。

POST my-index-01/_doc?pipeline=my-pipeline-01
{
  "message": "电脑",
  "tags": [ "weiruan    ", "  pingguo" ]
}

我们在写入数据的时候可以直接作用,不需要reindex。完全取决于你的场景。
我们来查看一下这个数据,我们看到没毛病。

{
  "_index" : "my-index-01",
  "_type" : "_doc",
  "_id" : "q0qiEJEBqSzQT13RTnJx",
  "_score" : 1.0,
  "_source" : {
    "message" : "电脑",
    "tags" : [
      "weiruan",
      "pingguo"
    ]
  }
}

其余诸如_update_by_query,update数据都可以配合使用。具体都可以参考官网。

2.5、查看摄取管道

见操作文档摄取管道操作

2.6、修改摄取管道

修改很简单,你原来是啥样的,然后改一下,在创建一遍,他就会把你同名的摄取管道给覆盖掉,达到修改的目的。

2.7、删除摄取管道

见操作文档摄取管道操作

2.8、测试摄取管道

这个我单独说一下,他的作用就是一个测试而不真正给你执行创建,就是模拟了一下,但是没有实际执行。
位置位于Simulate pipeline API

三、内置管道

es内部为我们内置了很多管道,我们可以开箱即用,实际上上文中我们使用的foreach和trim本身就是ES内置的摄取管道。
而这些内置管道可以参考文档位置为ES内置摄取管道文档位置
在这里插入图片描述
从图中可以看到有这么多,这我还没截全了。实际上比图中的要多。那么我们这里来尝试几个比较常用的,其余的你看文档可以自己上手。

0、Foreach processor

这个我们上面说过了,就是一个遍历数组元素的作用。

1、Append processor

Append processor文档位置
我们来看官网对他的介绍:

Appends one or more values to an existing array if the field already exists and it is an array. 
Converts a scalar to an array and appends one or more values to it if the field exists and it is a scalar. 
Creates an array containing the provided values
 if the field doesn’t exist. Accepts a single value or an array of values.

文档的大意就是如果字段已存在并且是数组,则将一个或多个值追加到现有数组。
将标量转换为数组,如果该字段存在并且它是标量,则向其中追加一个或多个值。
如果该字段不存在,则创建一个包含提供的值的数组。接受单个值或值数组。

怎么解释呢,官网永远不说人话,我们来解释一下。
数组:就是我们前面用的那个tags那种类型。
标量:就是非复杂类型的,比如字符串这种。
所以翻译一下这段话的意思就是:
1、如果你索引里面存在这个字段了,那么就看你这个字段是啥类型的。
如果是数组类型,那么Append processor的作用就是给你这个数组add一个值进去。
如果不是数组类型,是简单类型,那就会把你的这个简单类型转换成数组类型。然后再给你添加进去。

2、如果你索引里面不存在我操作这个字段,那直接就是给你创建成数组类型的,写进去的也是数组格式。

于是你有了这个概念之后我们就来试试,是不是这样呢。

  • 我们还用我们之前的索引my-index-01
PUT my-index-01
{
  "settings": {
    "number_of_replicas": 0
  }, 
  "mappings": {
    "properties": {
      "message":{
        "type": "text"
      },
      "tags":{
        "type": "keyword"
      }
    }
  }
}

我们来看一下这个管道的语法如下:Append processor

{
  "append": {
    "field": "tags",
    "value": ["production", "{{{app}}}", "{{{owner}}}"]
  }
}

我来解释一下,“field”: “tags”,就是你要操作的字段。你要给哪个字段添加内容,这里就写哪个字段。
“value”: [“production”, “{{{app}}}”, “{{{owner}}}”]这一句里面有两种属性,一个是"production",表示你要给每个tags里面添加一个production字符串,而"{{{app}}}“这种语法表示你要把你索引里面的app字段的内容也添加进去。如果没有这个app字段,那添加进去的就是个空字符串”"。
我们来试试。

1.1、给数组字段使用Append processor

PUT _ingest/pipeline/my-pipeline-01
{
  "description": "我的第一个pipeline",
  "processors": [
    {
      "append": {
        "field": "tags",
        "value": [
          "levi",
          "{{{message}}}"
        ]
      }
    }
  ]
}

我们给tags这个数组类型的字段添加内容,添加的值是字符串"levi",同时我们还给tags字段添加了message属性的值。
此时我们写入一条数据。我们预期的就是我们这条数据加进去之后,tags里面除了本身的"wuya", “zhuomuniao” ,还有我们摄取管道添加的"levi"以及他自己的message字段的值"鸟类"。

POST my-index-01/_doc?pipeline=my-pipeline-01
{
  "message": "鸟类",
  "tags": [ "wuya", "zhuomuniao" ]
}

我们来看一下这个数据。发现没毛病。
在这里插入图片描述

1.2、给标量字段使用Append processor

标量字段就是我们的message这个字符串字段。我们按照上面的理论可以知道,当我们写入的时候,他会把我们这个字符串字段转变为数组类型。并且把值写进数组。

PUT _ingest/pipeline/my-pipeline-02
{
  "description": "我的第二个pipeline",
  "processors": [
    {
      "append": {
        "field": "message", # 操作的标量字段
        "value": [
          "levi",
          "{{{message}}}"
        ]
      }
    }
  ]
}

我们这里是给message字段添加值,而且添加的内容是"levi"字符串,以及message本身这个内容,等于是把message添加两次。


POST my-index-01/_doc?pipeline=my-pipeline-02
{
  "message": "人类",
  "tags": [ "man", "woman" ]
}

在这里插入图片描述

我们看到,他把我们的message转成数组了。而且确实message里面的人类被写进去两次。

1.3、Append processor操作一个不存在的字段

PUT _ingest/pipeline/my-pipeline-03
{
  "description": "我的第三个pipeline",
  "processors": [
    {
      "append": {
        "field": "message",
        "value": [
          "production",
          "{{{123}}}"
        ]
      }
    }
  ]
}

我们操作了一个不存在的字段123,这时候我们看看会发生什么。
在这里插入图片描述
其实还是符合我们的预期的。

2、Date processor

Date processor文档位置
我们来看官网对他的介绍:

Parses dates from fields, and then uses the date or timestamp as the timestamp for the document. 
By default, the date processor adds the parsed date as a new field called @timestamp. 
You can specify a different field by setting the target_field configuration parameter. 
Multiple date formats are supported as part of the same date processor definition. 
They will be used sequentially to attempt parsing the date field, in the same order they were defined as part of the processor definition.

意思就是:
解析字段中的日期,然后使用日期或时间戳作为文档的时间戳。
默认情况下,日期处理器将解析的日期添加为名为 @timestamp 的新字段。
您可以通过设置 target_field 配置参数来指定不同的字段。支持多种日期格式作为同一日期处理器定义的一部分。
它们将按顺序使用来尝试解析日期字段,其顺序与它们被定义为处理器定义的一部分的顺序相同。

换句话说就是他会把你数据中的某个表示日期的字段,给解析成为一个你指定的日期格式的值,
然后赋值给你指定的那个字段。如果你没指定这个字段,那就他会给你自己创建一个叫做@timestamp的字段。
并且这个转换的日期格式可以是多种,实际上我们一般只需要一种,整个系统应该统一日期格式,不然未来
整出什么花活来就难受了。

3、Set processor

Set processor文档位置
我们来看官网对他的介绍:

Sets one field and associates it with the specified value. If the field already exists,
 its value will be replaced with the provided one.

意思就是:
设置一个字段并将其与指定值关联。如果该字段已存在,则其值将替换为提供的值。
换句话说就是为某一个字段设置值,这个值可以是你指定写死的,也可以是来自索引中的某个其他字段。

3.1、创建一个索引

PUT my-index-01
{
  "settings": {
    "number_of_replicas": 0
  }, 
  "mappings": {
    "properties": {
      "firstname":{
        "type": "keyword"
      },
      "lastname":{
        "type": "keyword"
      },
      "fullname":{
        "type": "keyword"
      }
    }
  }
}

3.2、创建set processor

PUT _ingest/pipeline/my-set-pipeline-01
{
  "description": "我的第一个set pipeline",
  "processors": [
    {
      "set": {
        "field": "fullname",
        "value": "{{{lastname}}} {{{firstname}}}  ok"
      }
    }
  ]
}

我们为我们的fullname字段通过set processor设置了值,值的内容分别是文档中的lastname字段和firstname字段的值拼起来的。并且我还在后面固定了一个字符串ok。

3.3、通过摄取管道写入数据

POST my-index-01/_doc?pipeline=my-set-pipeline-01
{
  "firstname": "levi",
  "lastname": "jack"
}

查看结果:符合我们的预期。
在这里插入图片描述

4、Script processor

这是我们最后要介绍的一个摄取管道,其余的后面有时间慢慢补上,这个比较强大,我们就来看看这个。
Script processor官方文档
我们来看官网对他的介绍:

Runs an inline or stored script on incoming documents. The script runs in the ingest context.

The script processor uses the script cache to avoid recompiling the script for each incoming document. 
To improve performance, ensure the script cache is properly sized before using a script processor in production.

意思就是:
对传入文档运行内联或存储的脚本。该脚本在摄取管道上下文中运行。
脚本处理器使用脚本缓存来避免为每个传入文档重新编译脚本。
为了提高性能,请在生产中使用脚本处理器之前确保脚本缓存的大小正确。

换句话说,以前我们那些管道都是一些内置的,你无外乎就是配置一些参数,但是这个不一样,这个你可以编辑
一些脚本逻辑来控制这个生效逻辑。

他的可选参数如下:其实其他管道也有可选参数,大差不差,之所以前面的没写是因为不难,看文档都能看懂,这个我这里写一下。
在这里插入图片描述
我们先来使用一下,实际上这里很多东西都和脚本有关,具体可以参考文档。

4.1、创建一个索引

PUT my-index-01
{
  "settings": {
    "number_of_replicas": 0
  }, 
  "mappings": {
    "properties": {
      "tags":{
        "type": "keyword"
      }
    }
  }
}

索引很简单,就是一个tags,我们里面准备存一些数组类型。

4.2、创建摄取管道

PUT _ingest/pipeline/my-script-pipeline-01
{
  "description": "我的第一个script pipeline",
  "processors": [
    {
      "script": {
        "description": "把tags中包含huawei的元素移除",
        "lang": "painless",
        "source": """
           def tags = ctx['tags'];
           if(tags != null){
             for (int i = 0; i < tags.length; i++) {
               if(tags[i] == params.removetag){
                 tags.remove(i);
               }
             }
           }
          """,
        "params": {
          "removetag": "huawei"
        }
      }
    }
  ]
}

你可以看到我们这个脚本实际上和java很像,他的ctx就是你每条数据过管道的时候他获取到这个数据上下文,然后取出你的tags字段遍历,发现和参数一样的就移除。

4.3、通过摄取管道写入数据

POST my-index-01/_doc?pipeline=my-script-pipeline-01
{
  "tags": ["xiaomi","huawei","meizu"]
}

查看数据我们看到huawei确实被移除了。
在这里插入图片描述

四、来个需求?

1、需求梳理

在这个实操中,我们会用java结合DSL的方式来实现实际开发中如何使用摄取管道的操作。

假如此时我们有个需求,客户提供了一个数据表是mysql表叫做newstable,领导要你把这个表接进来,然后写入你的ES。
这个表就两个字段,一个是name一个是tags

但是呢,他的表里面,
name字段有的前后有空格,有的还各种大小写不统一,领导要你都变成小写。
最离谱的是name字段里面的内容有html标签,这你完全不能忍,展示出来成啥了。
tags字段,你想在你的ES里面存成数组,结果mysql还是用逗号隔开的字符串。
而且tags字段数据是有一些敏感词,对于你们公司来说"tnd"这属于敏感词,你要给他过滤掉。

基于这样的"脏数据",你要在数据进入ES之前,给他治理一波。
开发之前,我们先来梳理一下这些需求的点:

1、前后有空格,问题不大,我们用trim processor来去掉。
2、大小写不统一,都要变为小写。问题也不大,我们用Lowercase processor来处理。
3、还有一个tags字段,你要存数组,但是源端是逗号隔开的字符串,你要切割开,我们使用Split processor。
4、html标签这个问题也不大,我们用HTML strip processor来处理。
5、敏感词过滤,我们用Script processor来处理移除掉。

于是我们选好了我们使用的管道之后,我们就来进行开发,我的环境很简单。

2、工程环境

1、jdk1.8
2、springboot2.3.4.RELEASE
3、maven 3.6.1
4、elasticsearch 7.17.7

以下是我的pom依赖配置。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.4.RELEASE</version>
    </parent>
    <groupId>com.leiv</groupId>
    <artifactId>springboot-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>springboot-demo</name>
    <description>springboot-demo</description>
    <packaging>jar</packaging>
    <properties>
        <java.version>1.8</java.version>
        <elasticsearch.version>7.17.7</elasticsearch.version>
    </properties>

    <dependencies>
    
        <!-- web场景启动器 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <!--引入ES客户端依赖-->
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-client</artifactId>
            <version>${elasticsearch.version}</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>${elasticsearch.version}</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>${elasticsearch.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

我的yaml文件配置。

elasticsearch:
  nodes: 127.0.0.1:9200
  cluster-name: my-application
  cluster-port: 9300

好了,我们已经有了环境,就可以进行开发了。

3、代码编写

3.1、es客户端类配置

/**
 * 方法描述:es基础配置绑定,映射配置文件
 */
@Data
@ConfigurationProperties(prefix = "elasticsearch")
public class EsConfigProperties {
    private String clusterName;
    private String[] nodes;
    private Integer clusterPort;
}

@Slf4j
@Configuration
@EnableConfigurationProperties(EsConfigProperties.class)
public class ElasticsearchConfiguration {

    private EsConfigProperties esConfigProperties;
    
 	public ElasticsearchConfiguration(EsConfigProperties esConfigProperties) {
        this.esConfigProperties = esConfigProperties;
    }

    /**
     * 方法描述:初始化es客户端 因为我们还没做安全管理,所以这里使用无密码的客户端
     * @return
     */
    @Bean(destroyMethod = "close",name = "restHighLevelClient")
    public RestHighLevelClient initRestClient() {
        RestHighLevelClient restHighLevelClient = null;
        try {
            String[] nodes = esConfigProperties.getNodes();
            if(Objects.isNull(nodes) || nodes.length == 0){
                log.info("es客户端初始化失败,未发现es节点配置{}", JSON.toJSONString(nodes));
            }
            List<HttpHost> hostList = new ArrayList<>();
            for (String node : nodes) {
                String[] nodeAttr = node.split(":");
                HttpHost httpHost = new HttpHost(nodeAttr[0], Integer.parseInt(nodeAttr[1]));
                hostList.add(httpHost);
            }
            HttpHost [] httpHostArray = hostList.toArray(new HttpHost[]{});
            RestClientBuilder builder = RestClient.builder(httpHostArray).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                @Override
                public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
                    return httpAsyncClientBuilder;
                }
            });
            restHighLevelClient = new RestHighLevelClient(builder);
            log.info("ES高级客户端初始化完成......");
        }catch (Exception e){
            e.printStackTrace();
            log.info("ES高级客户端初始化失败{}",e.getMessage());
        }
        return restHighLevelClient;
    }

    /**
     * 方法描述:解决es和netty版本冲突问题
     */
    @PostConstruct
    void init() {
        System.setProperty("es.set.netty.runtime.available.processors", "false");
    }

}

3.2、在kibana创建管道

实际上这种东西你也可以用java客户端来做,但是一般我们用脚本创建好保存起来就行,java代码有了问题你想改还得发版,所以我们选在在kibana端编写DSL。
我们把之前的梳理搬出来看看我们要创建哪几个管道。

1、前后有空格,问题不大,我们用trim processor来去掉。
2、大小写不统一,都要变为小写。问题也不大,我们用Lowercase processor来处理。
3、还有一个tags字段,你要存数组,但是源端是逗号隔开的字符串,你要切割开,我们使用Split processor。
4、html标签这个问题也不大,我们用HTML strip processor来处理。
5、敏感词过滤,我们用Script processor来处理移除掉。

根据这几个需求我们去挨个创建管道,注意哈,我们的管道是一个链,你完全可以在一个pipline里面配置多个管道,他会让你的每条数据,挨个去执行这些管道的。pipline翻译就是流水线,其实见名知义了。

我们编写我们的管道流水线。注意,我们的管道顺序编织是有顺序的,我们选择先去除html标签,在转小写,因为如果你先转小写可能把html标签给转成别的了。这样会影响你的处理,所以我们在编写的时候要考虑执行顺序带来的影响。

PUT _ingest/pipeline/my-etl-newstable-pipeline
{
  "description": "我的清洗newstable表数据的pipeline",
  "processors": [
    {
      "html_strip": {
        "field": "name"
      }
    },
    {
      "trim": {
        "field": "name"
      }
    },
    {
      "lowercase": {
        "field": "name"
      }
    },
    {
      "split": {
        "field": "tags",
        "separator": ",",
        "preserve_trailing": true
      }
    },
   
    {
      "script": {
        "description": "把tags中包含tmd,tnd的元素移除",
        "lang": "painless",
        "source": """
           def tags = ctx['tags'];
           if(tags != null){
             for (int i = 0; i < tags.length; i++) {
               if(tags[i] == params.tndparam){
                 tags.remove(i);
               }
               
             }
           }
          """,
        "params": {
          "tndparam": "tnd"
        }
      }
    }
  ]
}

所以我们最后的处理流程就是:
在这里插入图片描述
OK,至此,我们的管道就创建完毕了。我们接下来模拟在接口中写入数据,使用该管道。

3.3、创建索引

创建我们要写入的索引。

PUT newstable-index
{
  "settings": {
    "number_of_replicas": 0
  }, 
  "mappings": {
    "properties": {
      "name":{
        "type": "keyword"
      },
      "tags":{
        "type": "keyword"
      }
    }
  }
}

3.4、创建实体类和接口

// 模拟mysql表的实体映射
@Data
@AllArgsConstructor
@NoArgsConstructor
public class NewsTableEntity {
	// mysql中的name映射
    private String name;
    // mysql中的tags映射
    private String tags;
}

@RestController
@RequestMapping("/doc/es")
public class EsController {

    @Autowired
    RestHighLevelClient restHighLevelClient;

    // 写入的ES索引名称
    private static final String INDEX_NAME = "newstable-index";
    // 指定pipeline名称,这里要和你在kibana创建的pipeline对上
    private static final String PIPELINE_NAME = "my-etl-newstable-pipeline";

    @GetMapping("/addDoc")
    public boolean addDoc() throws IOException {
        List<NewsTableEntity> tableEntities = new ArrayList<>();
        // 模拟mysql表数据,name字段都是有html标签的,而且前后都有空格,而且tags字段都是逗号分隔的字符串,有的还有敏感词tnd
        tableEntities.add(new NewsTableEntity(" <H1> Hello world!</H1> ","HUAWEI,小米,苹果,tnd"));
        tableEntities.add(new NewsTableEntity(" <P>  你好</P> ","su7,bmw"));

        // 这里构造批量请求,你单个请求也一样的,实际开发一般都是批量,性能好一些
        BulkRequest bulkRequest = new BulkRequest();
        for (NewsTableEntity tableEntity : tableEntities) {
            IndexRequest request = new IndexRequest(INDEX_NAME)
                    .source(JSONObject.toJSONString(tableEntity), XContentType.JSON)
                    // 指定pipeline,这里只是示例,实际开发中根据业务需求来
                    .setPipeline(PIPELINE_NAME);
            bulkRequest.add(request);
        }
        BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
        return bulkResponse.hasFailures();

    }
}

3.5、测试接口

在这里插入图片描述
查看ES中的数据,发现我们已经清洗成功了。name去除了空格和html标签,tags变为了数组,并且移除了敏感词。
在这里插入图片描述

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部