将 Geometry 和 JsonB 数据从 Postgres 迁移到来自 logstash JDBC Input 插件的 ElasticSearch

这个故事解释了如何将 Postgres 表中的数据迁移到弹性搜索,包括使用 logstash 的几何和 JSONB 数据。最近,我开始在我们的应用程序用例中使用 Elastic 搜索,以避免来自微服务的跨数据连接。

Logstash 帮助我们将来自不同来源的数据播种到 Elastic 源索引。在这里,我们的源是 Postgres 数据库实例。我们有广泛的用例,其中之一是将几何和 JSONB 数据迁移到 Elastic Search 索引。

作为logstash的新手,我不得不花几个小时来破解这个问题。花了几个小时来了解 JDBC 输入插件和 logstash 中的其他过滤器。我在迁移特殊类型(几何和 JSONB)方面没有得到谷歌的足够支持。所以我想分享我的过期时间。
什么是logstash?

无论格式或复杂性如何,Logstash 都会动态摄取、转换和发送您的数据。使用 grok 从非结构化数据中派生结构,从 IP 地址破译地理坐标,匿名或排除敏感字段,并简化整体处理。

随着数据从源传输到存储,Logstash 过滤器解析每个事件,识别命名字段以构建结构,并将它们转换为聚合为更强大的分析和业务价值的通用格式。
无论格式或复杂性如何,Logstash 都会动态转换和准备您的数据:

  • 使用 grok 从非结构化数据中导出结构
  • 从 IP 地址破译地理坐标
  • 匿名 PII 数据,完全排除敏感字段
  • 简化整体处理,独立于数据源、格式或架构。

这是用例。

image.png

如何在本地安装?

为了在本地拥有它,我为 ELK 堆栈(弹性搜索和 Kibana)创建了一个 docker-compose 文件。后来在本地安装了logstash。

用于 ES 的 Docker-compose 文件和用于 UI 的 Kibana。

version: '2.2'
services:
  es01:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.12.1
    container_name: es01
    environment:
      - node.name=es01
      - cluster.name=es-docker-cluster
      - discovery.seed_hosts=es02,es03
      - cluster.initial_master_nodes=es01,es02,es03
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    ulimits:
      memlock:
        soft: -1
        hard: -1
    volumes:
      - data01:/usr/share/elasticsearch/data
    ports:
      - 9200:9200
    networks:
      - elastic

  es02:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.12.1
    container_name: es02
    environment:
      - node.name=es02
      - cluster.name=es-docker-cluster
      - discovery.seed_hosts=es01,es03
      - cluster.initial_master_nodes=es01,es02,es03
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    ulimits:
      memlock:
        soft: -1
        hard: -1
    volumes:
      - data02:/usr/share/elasticsearch/data
    networks:
      - elastic

  es03:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.12.1
    container_name: es03
    environment:
      - node.name=es03
      - cluster.name=es-docker-cluster
      - discovery.seed_hosts=es01,es02
      - cluster.initial_master_nodes=es01,es02,es03
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    ulimits:
      memlock:
        soft: -1
        hard: -1
    volumes:
      - data03:/usr/share/elasticsearch/data
    networks:
      - elastic

  kib01:
    image: docker.elastic.co/kibana/kibana:7.12.1
    container_name: kib01
    ports:
      - 5601:5601
    environment:
      ELASTICSEARCH_URL: http://es01:9200
      ELASTICSEARCH_HOSTS: '["http://es01:9200","http://es02:9200","http://es03:9200"]'
    networks:
      - elastic

volumes:
  data01:
    driver: local
  data02:
    driver: local
  data03:
    driver: local

networks:
  elastic:
    driver: bridge

ES 总共考虑了三个节点。

我已使用此链接在本地安装和启用 logstash。

将数据从 Postgres 迁移到 Elastic Search

为此,我们必须使用logstash。在此之前,让我们检查一下数据库。

我已经按名称创建了一个表,**users**其中包含几何和 JSONB 数据类型。

CREATE TABLE public.users (
   user_id uuid NOT NULL DEFAULT uuid_generate_v4(),
   first_name text NOT NULL,
   last_name text NULL,
   date_of_birth date NULL,
   city text NULL,
   country text NOT NULL,
   location geometry NOT NULL,
   created_at timestamp NOT NULL DEFAULT now(),
   updated_at timestamp NULL,
   additional_data jsonb NOT NULL DEFAULT '{}'::jsonb,
   CONSTRAINT users_pkey PRIMARY KEY (user_id)
);

我插入了一些大约 500 条记录的测试数据,只是为了测试 logstash 和 ES。


image.png

Logstash,一个数据管道通常包含三个部分:输入、过滤器和输出。对于我们的用例

输入 → RDBMS(JDBC 插件)

过滤器 → 我们使用 Ruby 和 mutate 过滤器。

输出 → 弹性搜索

JDBC 输入插件

通常,此插件的用途是连接 JDBC 实例并执行查询并将数据传递到后续阶段,即过滤和输出。

您可以在此处找到 JDBC 插件的所有选项

logstash 的配置是 conf 文件。

input {
  jdbc {
    jdbc_driver_library => "<Path to>/postgresql-42.2.19.jar"
    jdbc_driver_class => "org.postgresql.Driver"
    jdbc_connection_string => "jdbc:postgresql://localhost:5434/<schema>"
    jdbc_user => <username>
    jdbc_password => <password>
    jdbc_paging_enabled => true
    #schedule => "* * * * * *"
    statement => "select user_id as userId, first_name as firstName, last_name as lastName, date_of_birth as dateOfBirth, city as city, country as country, st_asgeojson(location) as location, created_at as createdAt, updated_at as updatedAt, additional_data::text as additionalData from users"
    use_column_value => true
    tracking_column => "userId"
  }
}

tracking_column 对我来说是主键。

首先,让我们假设没有像 Geometry 和 JSONB 这样的特殊字段,只有 text、number、boolean 和 date 字段。在这种情况下,不需要任何过滤器。您可以直接拥有一个输出插件。

output {
  stdout {  
        codec => json_lines  
    } 
  elasticsearch {
      hosts => ["localhost:9200"]
      index => "users_test"
      document_id => "%{userId}"
  }
}

但是,如果您的表具有几何、JSONB 或任何其他特殊类型的数据类型,而没有任何过滤器,您将获得以下异常。

JDBC 插件 - 缺少对完整类名称 = org.postgresql.util.PGobject、简单名称 = PGobject 的转换器处理

如果遇到此异常,则必须以文本形式获取数据并根据您的要求对其进行解析。

如何解决几何和 JSONB 的这个问题。

PostGIS 中的 Geometry 等价于 geo_point 或 geo_shape。我将选择geo_point,因为我在PostGIS 中采用了POINT。

geo_point可以以 5 种不同的方式存储。
请看这个链接

As an Object

{
   "point": { 
       "lat": 41.12,
       "lon": -71.34
    }
}

As String

{
    "point": "41.12,-71.34"
}

As GEOHASH

{
    "point": "drm3btev3e86"
}

As an Array

{
    "point": [ -71.34, 41.12 ]
}

As WKT POINT primitive

{
    "point" : "POINT (-71.34 41.12)"
}

我将在选项 4 和 5 中进行解释。

如何在 ES 中存储 JSONB

JSON 数据可以存储为 Object 或 Flattenned 类型。

为 GEO POINT 应用过滤器

有很多方法可以应用过滤器,但是使用 ruby 过滤器我们可以

filter {
    ruby {
        code => "
            require 'json'
            begin
                point_json = JSON.parse(event.get('location'))
                event.set('lon', point_json['coordinates'][0])
                event.set('lat', point_json['coordinates'][1])
            rescue Exception => e
                event.tag('invalide boundaries json')
            end
        "
    }
}

为 JSON 对象应用过滤器

使用 Ruby JSON 函数,数据被解析为 JSON 对象。

filter {
    ruby {
        code => "
            require 'json'
            begin
                data_json = JSON.parse(event.get('additionaldata').to_s || {})
                event.set('data', data_json)
            rescue Exception => e
                event.tag('invalide boundaries json')
            end
        "
    }
}

我们需要另一个过滤器来转换和替换字段名(通常在 RDBMS 中,列名由“_”分隔。这将替换为驼峰式约定)。

filter {
  mutate {
      rename => {
        "data" => "additionalData"
      }
      replace => {
        "location" => "POINT(%{lon} %{lat})"
      }
      rename => {
        "userid" => "userId"
      }
      rename => {
        "firstname" => "firstName"
      }
      rename => {
        "lastname" => "lastName"
      }
      rename => {
        "dateofbirth" => "dateOfBirth"
      }
      rename => {
        "createdat" => "createdAt"
      }
      rename => {
        "updatedat" => "updatedAt"
      }
      remove_field => ["lat", "lon", "@version", "@timestamp", "additionaldata", "data"]
    }
}

根据 RDBMS 为索引创建映射

如果您不创建映射,logstash 将根据第一个记录数据类型创建索引和映射。如果映射是自动创建的,可能会有一些偏差。所以总是建议创建一个映射。

PUT users_test
{
    "mappings" : {
      "properties" : {
        "userId" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "firstname" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "lastname" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "dateOfBirth" : {
          "type" : "date"
        },
        "city" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "country" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "location" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "createdAt" : {
          "type" : "date"
        },
        "updatedAt" : {
          "type" : "date"
        },
        "additionalData" : {
          "type" : "object"
        }
      }
  }
}

完整的 logstash 配置文件

input {
  jdbc {
    jdbc_driver_library => "/Users/ereshgorantla/Documents/Dev/postgresql-42.2.19.jar"
    jdbc_driver_class => "org.postgresql.Driver"
    jdbc_connection_string => "jdbc:postgresql://localhost:5434/address_service"
    jdbc_user => postgres
    jdbc_password => postgres
    jdbc_paging_enabled => true
    #schedule => "* * * * * *"
    statement => "select user_id as userId, first_name as firstName, last_name as lastName, date_of_birth as dateOfBirth, city as city, country as country, st_asgeojson(location) as location, created_at as createdAt, updated_at as updatedAt, additional_data::text as additionalData from users"
    use_column_value => true
    tracking_column => "userid"
  }
}

filter {
    ruby {
        code => "
            require 'json'
            begin
                point_json = JSON.parse(event.get('location'))
                data_json = JSON.parse(event.get('additionaldata').to_s || {})
                event.set('lon', point_json['coordinates'][0])
                event.set('lat', point_json['coordinates'][1])
                event.set('data', data_json)
            rescue Exception => e
                event.tag('invalide boundaries json')
            end
        "
    }
}

filter {
  mutate {
      rename => {
        "data" => "additionalData"
      }
      replace => {
        "location" => "POINT(%{lon} %{lat})"
      }
      rename => {
        "userid" => "userId"
      }
      rename => {
        "firstname" => "firstName"
      }
      rename => {
        "lastname" => "lastName"
      }
      rename => {
        "dateofbirth" => "dateOfBirth"
      }
      rename => {
        "createdat" => "createdAt"
      }
      rename => {
        "updatedat" => "updatedAt"
      }
      remove_field => ["lat", "lon", "@version", "@timestamp", "additionaldata", "data"]
    }
}

output {
  stdout {  
        codec => json_lines  
    } 
  elasticsearch {
      hosts => ["localhost:9200"]
      index => "users_test"
      document_id => "%{userId}"
      #point => POINT("%{lon}" "%{lat}")
  }
}

这是日志文件。我在日志文件中截断了数据日志。

Using JAVA_HOME defined java: /Library/Java/JavaVirtualMachines/jdk-11.0.9.jdk/Contents/Home
WARNING, using JAVA_HOME while Logstash distribution comes with a bundled JDK
Sending Logstash logs to /Users/ereshgorantla/Documents/Dev/logstash-7.13.0/logs which is now configured via log4j2.properties
[2021-06-21T18:03:54,197][INFO ][logstash.runner          ] Log4j configuration path used is: /Users/ereshgorantla/Documents/Dev/logstash-7.13.0/config/log4j2.properties
[2021-06-21T18:03:54,205][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"7.13.0", "jruby.version"=>"jruby 9.2.16.0 (2.5.7) 2021-03-03 f82228dc32 Java HotSpot(TM) 64-Bit Server VM 11.0.9+7-LTS on 11.0.9+7-LTS +indy +jit [darwin-x86_64]"}
[2021-06-21T18:03:54,262][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2021-06-21T18:03:54,726][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2021-06-21T18:03:55,251][INFO ][org.reflections.Reflections] Reflections took 33 ms to scan 1 urls, producing 24 keys and 48 values 
[2021-06-21T18:03:56,014][INFO ][logstash.outputs.elasticsearch][main] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//localhost:9200"]}
[2021-06-21T18:03:56,239][INFO ][logstash.outputs.elasticsearch][main] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://localhost:9200/]}}
[2021-06-21T18:03:56,362][WARN ][logstash.outputs.elasticsearch][main] Restored connection to ES instance {:url=>"http://localhost:9200/"}
[2021-06-21T18:03:56,400][INFO ][logstash.outputs.elasticsearch][main] Elasticsearch version determined (7.12.1) {:es_version=>7}
[2021-06-21T18:03:56,401][WARN ][logstash.outputs.elasticsearch][main] Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>7}
[2021-06-21T18:03:56,494][INFO ][logstash.outputs.elasticsearch][main] Using a default mapping template {:es_version=>7, :ecs_compatibility=>:disabled}
[2021-06-21T18:03:56,521][INFO ][logstash.javapipeline    ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>12, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>1500, "pipeline.sources"=>["/Users/ereshgorantla/Documents/Dev/logstash-7.13.0/bin/users_logstash.conf"], :thread=>"#<Thread:0xfccb4f5 run>"}
[2021-06-21T18:03:57,276][INFO ][logstash.javapipeline    ][main] Pipeline Java execution initialization time {"seconds"=>0.75}
[2021-06-21T18:03:57,375][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
[2021-06-21T18:03:57,420][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2021-06-21T18:03:57,982][INFO ][logstash.inputs.jdbc     ][main][eed2191982f2fcf3eb078f1532ac1dbad93c3ff95e53bd7295f3d8f9a20e8ae9] (0.012941s) SELECT CAST(current_setting('server_version_num') AS integer) AS v
[2021-06-21T18:03:58,084][INFO ][logstash.inputs.jdbc     ][main][eed2191982f2fcf3eb078f1532ac1dbad93c3ff95e53bd7295f3d8f9a20e8ae9] (0.002953s) SELECT count(*) AS "count" FROM (select user_id as userId, first_name as firstName, last_name as lastName, date_of_birth as dateOfBirth, city as city, country as country, st_asgeojson(location) as location, created_at as createdAt, updated_at as updatedAt, additional_data::text as additionalData from users) AS "t1" LIMIT 1
[2021-06-21T18:03:58,136][INFO ][logstash.inputs.jdbc     ][main][eed2191982f2fcf3eb078f1532ac1dbad93c3ff95e53bd7295f3d8f9a20e8ae9] (0.032835s) SELECT * FROM (select user_id as userId, first_name as firstName, last_name as lastName, date_of_birth as dateOfBirth, city as city, country as country, st_asgeojson(location) as location, created_at as createdAt, updated_at as updatedAt, additional_data::text as additionalData from users) AS "t1" LIMIT 100000 OFFSET 0
{"country":"India","location":"POINT(55.470544 -101.12345)","city":"Hyderabad","updatedAt":"2021-06-17T11:44:08.954Z","createdAt":"2021-06-17T11:30:39.423Z","lastName":"aed6f9491c","additionalData":{"hadKids":true,"status":"Married"},"firstName":"6bbcbd05c5","dateOfBirth":"1979-06-16T18:30:00.000Z","userId":"3d2212ac-7347-482d-98a8-81301f817e65"}
{"country":"India","location":"POINT(55.470544 -101.12345)","city":"Hyderabad","updatedAt":"2021-06-17T11:44:08.954Z","createdAt":"2021-06-17T11:30:39.423Z","lastName":"337360cb29","additionalData":{"hadKids":true,"status":"Married"},"firstName":"eb552550a4","dateOfBirth":"1981-06-16T18:30:00.000Z","userId":"d34785f2-debb-47bc-ac5c-cd6fb14817ed"}
-------
[2021-06-21T18:03:59,463][INFO ][logstash.javapipeline    ][main] Pipeline terminated {"pipeline.id"=>"main"}
[2021-06-21T18:03:59,964][INFO ][logstash.pipelinesregistry] Removed pipeline from registry successfully {:pipeline_id=>:main}
[2021-06-21T18:03:59,989][INFO ][logstash.runner          ] Logstash shut down.

转载:https://medium.com/geekculture/migrate-geometry-and-jsonb-data-from-postgres-to-elasticsearch-from-logstash-jdbc-input-plugin-d304a8ad47a6

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,547评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,399评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,428评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,599评论 1 274
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,612评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,577评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,941评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,603评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,852评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,605评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,693评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,375评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,955评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,936评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,172评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,970评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,414评论 2 342

推荐阅读更多精彩内容