初识Ingest Pipeline

初识Ingest Pipeline

ES在具备node.ingest:true的节点上提供了pipeline功能,可以在文档真正写入ES之前进行一些转换,插入新值,修改文档内容等功能。pipeline会拦截index和bulk请求,请求经处理后再写入ES。用户可以定义一系列的"处理器",在处理文档时,pipeline会按照processor的定义顺序执行processor。定义个格式:

PUT _ingest/pipeline/my_pipeline_id
{
  "description" : "describe pipeline",
  "processors" : [
    {
      "set" : {
        "field": "foo",
        "value": "new"
      }
    }
  ]
}

官方示例:在PIPELINE中使用Scripts Processor

以ES 官方文档中的这个例子进行试验:https://www.elastic.co/guide/en/elasticsearch/painless/current/painless-context-examples.html

其提供了一份数据,里面包含了歌剧的场次信息统计。我们首先要把样例数据导入到ES中。其中一条是这样:

{ "create" : { "_index" : "seats", "_type" : "seat", "_id" : "36203" } }
{ "theatre" : "Skyline", "play" : "Auntie Jo", "actors": [ "Jo Hangum", "Jon Hittle", "Rob Kettleman", "Laura Conrad", "Simon Hower", "Nora Blue" ], "date": "2018-12-14", "time": "5:40PM", "row": 11, "number": 14, "cost": 17.5, "sold": false }

我们可以看到,其内容是有问题的,_type类型在目前的版本中即将被废弃,只能是doc类型。因此如果按照原文的方法导入,是会失败的。

https://www.elastic.co/guide/en/elasticsearch/reference/7.6/mapping-type-field.html

我们需要对_type进行修改。在painless脚本中,可以使用ctx['_type']来修改插入操作的type.

ctx['_index']

The name of the index.

ctx['_type']

The type of document within an index.

增加一条ingest的处理器。

PUT _ingest/pipeline/seats
{
  "description": "seats-ingest",
  "processors": [
    {"script": {
      "lang": "painless",
      "source": """
      ctx['_type'] = "_doc";
      """
    }}
  ]
}

在kibana上尝试插入一条:

POST _bulk?pipeline=seats
{ "create" : { "_index" : "seats", "_type" : "seat", "_id" : "36203" } }
{ "theatre" : "Skyline", "play" : "Auntie Jo", "actors": [ "Jo Hangum", "Jon Hittle", "Rob Kettleman", "Laura Conrad", "Simon Hower", "Nora Blue" ], "date": "2018-12-14", "time": "5:40PM", "row": 11, "number": 14, "cost": 17.5, "sold": false }

#! Deprecation: [types removal] Specifying types in bulk requests is deprecated.
{
  "took" : 4960,
  "ingest_took" : 373,
  "errors" : false,
  "items" : [
    {
      "create" : {
        "_index" : "seats",
        "_type" : "_doc",
        "_id" : "36203",
        "_version" : 1,
        "result" : "created",
        "_shards" : {
          "total" : 2,
          "successful" : 1,
          "failed" : 0
        },
        "_seq_no" : 0,
        "_primary_term" : 1,
        "status" : 201
      }
    }
  ]
}

可以看到,是有_bulk请求插入时指定_doc类型的做法已经废弃了。可以看到_type已经被改成了_doc,而如果没有这个pipeline,结果将是_bulk请求中指定的seats类型。

修改后脚本改为如下:

PUT _ingest/pipeline/seats
{
  "description": "seats-ingest",
  "processors": [
    {"script": {
      "lang": "painless",
      "source": """
      String[] split(String str, char delimiter)
      {
        int count = 0;
        for (char c: str.toCharArray())
        {
          if (c == delimiter)
          {
            ++ count;
          }
        }
        
        if (count == 0)
        {
          return new String[] {str};
        }
        
        String[] r = new String[count + 1];
        int i0 = 0, i1 = 0, n = 0;
        for (char c: str.toCharArray())
        {
          if (c == delimiter)
          {
            r[n] = str.substring(i0, i1);
            ++n;
            i0 = i1 + 1;
          }
          ++ i1;
        }
        r[count] = str.substring(i0, i1);
        
        return r;
      }
      
      ctx['_type'] = "_doc";
      
      String[] date_array = split(ctx.date, (char)"-");
      
      String year = date_array[0].trim();
      String month = date_array[1].trim();
      if (month.length() == 1)
      {
        month = "0" + month;
      }
      String day = date_array[2].trim();
      if (day.length() == 1)
      {
        day = "0" + day;
      }
      
      boolean is_pm = ctx.time.substring(ctx.time.length() - 2).equals("PM");
      
      String[] time_array = split(ctx.time.substring(0, ctx.time.length()-2), (char)":");
      
      int hour = Integer.parseInt(time_array[0].trim());
  
      if (is_pm)
      {
        hour += 12;
      }
      String hour_str = "";
      if (hour < 10)
      {
        hour_str += "0";
      }
      hour_str += hour;
      
      int min = Integer.parseInt(time_array[1].trim());
      String min_str = "";
      if (min < 10)
      {
        min_str += "0";
      }
      min_str += min;
      
      String date_time = year + "-" + month + "-" + day + "T" + hour_str + ":" + min + ":00+08:00";
      
      ZonedDateTime dt = ZonedDateTime.parse(
         date_time, DateTimeFormatter.ISO_OFFSET_DATE_TIME);               
      ctx.datetime = dt.getLong(ChronoField.INSTANT_SECONDS)*1000L;
      ctx["_type"] = "_doc";
      """
    }}
  ]
}

对应的导入命令也做一下修改,改为:

curl -k -XPOST https://elastic:elastic@localhost:9200/_bulk?pipeline=seats -H "Content-Type: application/x-ndjson" --data-binary "@/home/DATA/seats-init.json"

之前试了好几次,都在指定输入文件这一步出错了,最后发现是需要在路径最前面加一个@

可能是我的机器不是特别好,导入时间有点长。7万多条记录花了很长时间。事实上最后导入失败了,只导入了3000多条。错误显示主分片不可访问。

对比ClickHouse,8000万条记录只花了几分钟就搞定。用来进行分析,ClickHouse各方面指标要来得更好。

对pipeline的添加,查看,修改,模拟调试API

ES 提供了一些API对ingest 的pipeline进行调试.

添加

PUT _ingest/pipeline/my-pipeline-id
{
  "description" : "describe pipeline",
  "processors" : [
    {
      "set" : {
        "field": "foo",
        "value": "bar"
      }
    }
  ]
}

查看

GET _ingest/pipeline/my-pipeline-id

删除

DELETE _ingest/pipeline/my-pipeline-id

模拟

POST _ingest/pipeline/_simulate
{
  "pipeline" : {
    // pipeline definition here
  },
  "docs" : [
    { "_source": {/** first document **/} },
    { "_source": {/** second document **/} },
    // ...
  ]
}

也可以这样,在POST行中指定索引名称进行处理:

POST _ingest/pipeline/seats/_simulate
{
  "docs": [
    {
      "_source": {
        "theatre": "Skyline",
        "play": "Auntie Jo",
        "actors": [
          "Jo Hangum",
          "Jon Hittle",
          "Rob Kettleman",
          "Laura Conrad",
          "Simon Hower",
          "Nora Blue"
        ],
        "date": "2018-12-14",
        "time": "5:40PM",
        "row": 11,
        "number": 14,
        "cost": 17.5,
        "sold": false
      }
    }
  ]
}

如何在Pipeline中操作使用文档的数据

文档的"_source"数据

上文的例子中,我们在Pipeline中使用script处理器时,可以使用painless的内置数据结构ctx对文档source进行处理。

ctx是一个mapping对象,内部存储了包括文档的meta数据和_source数据:

ingest情况下
_index: ctx._index or ctx["_index"]
_doc: ctx._doc or ctx["_doc"]
_op: ctx._op or ctx["_op"]
xxxx: ctx.xxxx or ctx["xxxx"]

Update情况下
xxxx: ctx._source.xxxx

在不使用script的情况下,同样可以对_source中的数据进行访问。

比如说使用set处理器的情况下:

PUT _ingest/pipeline/bbb
{
  "description": "test",
  "processors": [
    {"set": {
      "field": "_source.foo",
      "value": "bar"
    }}
  ]
}

PUT _ingest/pipeline/aaa
{
  "description": "test",
  "processors": [
    {"set": {
      "field": "foo",
      "value": "bar"
    }}
  ]
}

可以直接通过数据名或者_source.数据吗进行处理。

文档的metadata

文档的metadata包括_index, _type, _routing, _id,在pipeline中均可以直接访问。

Ingest的metadata

PUT _ingest/pipeline/ccc
{
  "description": "test",
  "processors": [
    {"set": {
    "field": "received",
    "value": "{{_ingest.timestamp}}"
    }}
  ]
}

不同于metadata里的成员,_ingest可以是一个_source中的合法成员,因此,访问ingest的meta数据需要使用这样的方式{{_ingest.timestamp}}。

template 中的数据与metadata

在模拟中,也可以使用pipeline,同样,访问template中的数据也需要使用{{ }}。

{
  "set": {
    "field": "field_c",
    "value": "{{field_a}} {{field_b}}"
  }
}

上面的这个set processor,将原有的 field_a 和 field_b 做了拼接之后,将其赋给了一个新字段field_c。

同时,template中也支持动态设置字段。

{
  "set": {
    "field": "{{service}}",
    "value": "{{code}}"
  }
}

要说明的是,如果想在template中使用,需要设置动态索引参数:

index.default_pipeline

The default ingest node pipeline for this index. Index requests will fail if the default pipeline is set and the pipeline does not exist. The default may be overridden using the pipeline parameter. The special pipeline name _none indicates no ingest pipeline should be run.

index.final_pipeline

The final ingest node pipeline for this index. Index requests will fail if the final pipeline is set and the pipeline does not exist. The final pipeline always runs after the request pipeline (if specified) and the default pipeline (if it exists). The special pipeline name _none indicates no ingest pipeline will run.

需要注意的是: final_pipeline是7.5版本之后才支持的。之前版本没有,比如我用的7.4版本,这个参数就不支持。

使用方法,在index的setting中指定:

  "settings": {
      "number_of_shards": 1,
      "default_pipeline": "_none"
  },

default_pipeline会被命令行中指定的pipeline覆盖,而final_pipeline会在最后执行。

条件执行

Pipeline中的每个processor都支持一个if参数,满足条件的processor才会执行。

PUT _ingest/pipeline/drop_guests_network
{
  "processors": [
    {
      "drop": {
        "if": "ctx.network_name == 'Guest'"
      }
    }
  ]
}

这是官方文档中的一个例子,当network_name为Guest时,丢弃这条记录。

同时,if参数也支持复杂的脚本。

PUT _ingest/pipeline/not_prod_dropper
{
  "processors": [
    {
      "drop": {
        "if": """
            Collection tags = ctx.tags;
            if(tags != null){
              for (String tag : tags) {
                  if (tag.toLowerCase().contains('prod')) {
                      return false;
                  }
              }
            }
            return true;
        """
      }
    }
  ]
}

编写一段painless脚本,返回true or false即可。

Processors中可以使用嵌套对象进行判断,但是为了防止产生null point exception,可以使用?.来访问成员以防止成员不存在。

PUT _ingest/pipeline/not_prod_dropper
{
  "processors": [
    {
      "drop": {
        "if": """
            Collection tags = ctx.tags;
            if(tags != null){
              for (String tag : tags) {
                  if (tag.toLowerCase().contains('prod')) {
                      return false;
                  }
              }
            }
            return true;
        """
      }
    }
  ]
}

有一种processor名为pipeline,可以结合if条件指定对应的pipeline名称。

curl -X PUT "localhost:9200/_ingest/pipeline/logs_pipeline?pretty" -H 'Content-Type: application/json' -d'
{
  "description": "A pipeline of pipelines for log files",
  "version": 1,
  "processors": [
    {
      "pipeline": {
        "if": "ctx.service?.name == \u0027apache_httpd\u0027",
        "name": "httpd_pipeline"
      }
    },
    {
      "pipeline": {
        "if": "ctx.service?.name == \u0027syslog\u0027",
        "name": "syslog_pipeline"
      }
    },
    {
      "fail": {
        "message": "This pipeline requires service.name to be either `syslog` or `apache_httpd`"
      }
    }
  ]
}
'

上面的pipeline processor指定了两个参数,if是该processor满足的条件,name是对应需要执行的pipeline的名称。

此外,pipeline的条件中还可以使用正则表达式。

PUT _ingest/pipeline/check_url
{
  "processors": [
    {
      "set": {
        "if": "ctx.href?.url =~ /^http[^s]/",
        "field": "href.insecure",
        "value": true
      }
    }
  ]
}

异常处理

正常情况下,pipeline会顺序执行所有的processor,并在遇到第一个异常时,中断当前文档的处理。但是有时候,用户希望自定义异常的处理,这时候就需要为processor设置on_failure参数。这样当process遇到error的时候,会执行on_failure里的内容,然后继续执行下一个处理器。

{
  "description" : "my first pipeline with handled exceptions",
  "processors" : [
    {
      "rename" : {
        "field" : "foo",
        "target_field" : "bar",
        "on_failure" : [
          {
            "set" : {
              "field" : "error",
              "value" : "field \"foo\" does not exist, cannot rename to \"bar\""
            }
          }
        ]
      }
    }
  ]
}

也可以进行索引级别的异常处理:

{
  "description" : "my first pipeline with handled exceptions",
  "processors" : [
    {
      "rename" : {
        "field" : "foo",
        "target_field" : "bar"
        ]
      }
    }
  ],
  "on_failure" : [
    {
      "set" : {
        "field" : "_index",
        "value" : "failed-{{ _index }}"
      }
    }
  ]
}

值得注意的是,同一个异常只能被捕获一次,这个和java的异常处理机制是一致的。如果同时定义了processor和pipeline级别的异常处理模块,则异常只会被processor级别所捕获。

同时,我们也可以让processor忽略此处的错误。只要将参数ignore_failure置为true即可。

{
  "description" : "my first pipeline with handled exceptions",
  "processors" : [
    {
      "rename" : {
        "field" : "foo",
        "target_field" : "bar",
        "ignore_failure" : true
      }
    }
  ]
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,711评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,932评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,770评论 0 330
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,799评论 1 271
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,697评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,069评论 1 276
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,535评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,200评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,353评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,290评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,331评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,020评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,610评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,694评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,927评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,330评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,904评论 2 341

推荐阅读更多精彩内容