这个故事解释了如何将 Postgres 表中的数据迁移到弹性搜索,包括使用 logstash 的几何和 JSONB 数据。最近,我开始在我们的应用程序用例中使用 Elastic 搜索,以避免来自微服务的跨数据连接。
Logstash 帮助我们将来自不同来源的数据播种到 Elastic 源索引。在这里,我们的源是 Postgres 数据库实例。我们有广泛的用例,其中之一是将几何和 JSONB 数据迁移到 Elastic Search 索引。
作为logstash的新手,我不得不花几个小时来破解这个问题。花了几个小时来了解 JDBC 输入插件和 logstash 中的其他过滤器。我在迁移特殊类型(几何和 JSONB)方面没有得到谷歌的足够支持。所以我想分享我的过期时间。
什么是logstash?
无论格式或复杂性如何,Logstash 都会动态摄取、转换和发送您的数据。使用 grok 从非结构化数据中派生结构,从 IP 地址破译地理坐标,匿名或排除敏感字段,并简化整体处理。
随着数据从源传输到存储,Logstash 过滤器解析每个事件,识别命名字段以构建结构,并将它们转换为聚合为更强大的分析和业务价值的通用格式。
无论格式或复杂性如何,Logstash 都会动态转换和准备您的数据:
- 使用 grok 从非结构化数据中导出结构
- 从 IP 地址破译地理坐标
- 匿名 PII 数据,完全排除敏感字段
- 简化整体处理,独立于数据源、格式或架构。
这是用例。
如何在本地安装?
为了在本地拥有它,我为 ELK 堆栈(弹性搜索和 Kibana)创建了一个 docker-compose 文件。后来在本地安装了logstash。
用于 ES 的 Docker-compose 文件和用于 UI 的 Kibana。
version: '2.2'
services:
es01:
image: docker.elastic.co/elasticsearch/elasticsearch:7.12.1
container_name: es01
environment:
- node.name=es01
- cluster.name=es-docker-cluster
- discovery.seed_hosts=es02,es03
- cluster.initial_master_nodes=es01,es02,es03
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- data01:/usr/share/elasticsearch/data
ports:
- 9200:9200
networks:
- elastic
es02:
image: docker.elastic.co/elasticsearch/elasticsearch:7.12.1
container_name: es02
environment:
- node.name=es02
- cluster.name=es-docker-cluster
- discovery.seed_hosts=es01,es03
- cluster.initial_master_nodes=es01,es02,es03
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- data02:/usr/share/elasticsearch/data
networks:
- elastic
es03:
image: docker.elastic.co/elasticsearch/elasticsearch:7.12.1
container_name: es03
environment:
- node.name=es03
- cluster.name=es-docker-cluster
- discovery.seed_hosts=es01,es02
- cluster.initial_master_nodes=es01,es02,es03
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- data03:/usr/share/elasticsearch/data
networks:
- elastic
kib01:
image: docker.elastic.co/kibana/kibana:7.12.1
container_name: kib01
ports:
- 5601:5601
environment:
ELASTICSEARCH_URL: http://es01:9200
ELASTICSEARCH_HOSTS: '["http://es01:9200","http://es02:9200","http://es03:9200"]'
networks:
- elastic
volumes:
data01:
driver: local
data02:
driver: local
data03:
driver: local
networks:
elastic:
driver: bridge
ES 总共考虑了三个节点。
我已使用此链接在本地安装和启用 logstash。
将数据从 Postgres 迁移到 Elastic Search
为此,我们必须使用logstash。在此之前,让我们检查一下数据库。
我已经按名称创建了一个表,**users**
其中包含几何和 JSONB 数据类型。
CREATE TABLE public.users (
user_id uuid NOT NULL DEFAULT uuid_generate_v4(),
first_name text NOT NULL,
last_name text NULL,
date_of_birth date NULL,
city text NULL,
country text NOT NULL,
location geometry NOT NULL,
created_at timestamp NOT NULL DEFAULT now(),
updated_at timestamp NULL,
additional_data jsonb NOT NULL DEFAULT '{}'::jsonb,
CONSTRAINT users_pkey PRIMARY KEY (user_id)
);
我插入了一些大约 500 条记录的测试数据,只是为了测试 logstash 和 ES。
Logstash,一个数据管道通常包含三个部分:输入、过滤器和输出。对于我们的用例
输入 → RDBMS(JDBC 插件)
过滤器 → 我们使用 Ruby 和 mutate 过滤器。
输出 → 弹性搜索
JDBC 输入插件
通常,此插件的用途是连接 JDBC 实例并执行查询并将数据传递到后续阶段,即过滤和输出。
您可以在此处找到 JDBC 插件的所有选项。
logstash 的配置是 conf 文件。
input {
jdbc {
jdbc_driver_library => "<Path to>/postgresql-42.2.19.jar"
jdbc_driver_class => "org.postgresql.Driver"
jdbc_connection_string => "jdbc:postgresql://localhost:5434/<schema>"
jdbc_user => <username>
jdbc_password => <password>
jdbc_paging_enabled => true
#schedule => "* * * * * *"
statement => "select user_id as userId, first_name as firstName, last_name as lastName, date_of_birth as dateOfBirth, city as city, country as country, st_asgeojson(location) as location, created_at as createdAt, updated_at as updatedAt, additional_data::text as additionalData from users"
use_column_value => true
tracking_column => "userId"
}
}
tracking_column 对我来说是主键。
首先,让我们假设没有像 Geometry 和 JSONB 这样的特殊字段,只有 text、number、boolean 和 date 字段。在这种情况下,不需要任何过滤器。您可以直接拥有一个输出插件。
output {
stdout {
codec => json_lines
}
elasticsearch {
hosts => ["localhost:9200"]
index => "users_test"
document_id => "%{userId}"
}
}
但是,如果您的表具有几何、JSONB 或任何其他特殊类型的数据类型,而没有任何过滤器,您将获得以下异常。
JDBC 插件 - 缺少对完整类名称 = org.postgresql.util.PGobject、简单名称 = PGobject 的转换器处理
如果遇到此异常,则必须以文本形式获取数据并根据您的要求对其进行解析。
如何解决几何和 JSONB 的这个问题。
PostGIS 中的 Geometry 等价于 geo_point 或 geo_shape。我将选择geo_point,因为我在PostGIS 中采用了POINT。
geo_point
可以以 5 种不同的方式存储。
请看这个链接
As an Object
{
"point": {
"lat": 41.12,
"lon": -71.34
}
}
As String
{
"point": "41.12,-71.34"
}
As GEOHASH
{
"point": "drm3btev3e86"
}
As an Array
{
"point": [ -71.34, 41.12 ]
}
As WKT POINT primitive
{
"point" : "POINT (-71.34 41.12)"
}
我将在选项 4 和 5 中进行解释。
如何在 ES 中存储 JSONB
JSON 数据可以存储为 Object 或 Flattenned 类型。
为 GEO POINT 应用过滤器
有很多方法可以应用过滤器,但是使用 ruby 过滤器我们可以
filter {
ruby {
code => "
require 'json'
begin
point_json = JSON.parse(event.get('location'))
event.set('lon', point_json['coordinates'][0])
event.set('lat', point_json['coordinates'][1])
rescue Exception => e
event.tag('invalide boundaries json')
end
"
}
}
为 JSON 对象应用过滤器
使用 Ruby JSON 函数,数据被解析为 JSON 对象。
filter {
ruby {
code => "
require 'json'
begin
data_json = JSON.parse(event.get('additionaldata').to_s || {})
event.set('data', data_json)
rescue Exception => e
event.tag('invalide boundaries json')
end
"
}
}
我们需要另一个过滤器来转换和替换字段名(通常在 RDBMS 中,列名由“_”分隔。这将替换为驼峰式约定)。
filter {
mutate {
rename => {
"data" => "additionalData"
}
replace => {
"location" => "POINT(%{lon} %{lat})"
}
rename => {
"userid" => "userId"
}
rename => {
"firstname" => "firstName"
}
rename => {
"lastname" => "lastName"
}
rename => {
"dateofbirth" => "dateOfBirth"
}
rename => {
"createdat" => "createdAt"
}
rename => {
"updatedat" => "updatedAt"
}
remove_field => ["lat", "lon", "@version", "@timestamp", "additionaldata", "data"]
}
}
根据 RDBMS 为索引创建映射
如果您不创建映射,logstash 将根据第一个记录数据类型创建索引和映射。如果映射是自动创建的,可能会有一些偏差。所以总是建议创建一个映射。
PUT users_test
{
"mappings" : {
"properties" : {
"userId" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"firstname" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"lastname" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"dateOfBirth" : {
"type" : "date"
},
"city" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"country" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"location" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"createdAt" : {
"type" : "date"
},
"updatedAt" : {
"type" : "date"
},
"additionalData" : {
"type" : "object"
}
}
}
}
完整的 logstash 配置文件
input {
jdbc {
jdbc_driver_library => "/Users/ereshgorantla/Documents/Dev/postgresql-42.2.19.jar"
jdbc_driver_class => "org.postgresql.Driver"
jdbc_connection_string => "jdbc:postgresql://localhost:5434/address_service"
jdbc_user => postgres
jdbc_password => postgres
jdbc_paging_enabled => true
#schedule => "* * * * * *"
statement => "select user_id as userId, first_name as firstName, last_name as lastName, date_of_birth as dateOfBirth, city as city, country as country, st_asgeojson(location) as location, created_at as createdAt, updated_at as updatedAt, additional_data::text as additionalData from users"
use_column_value => true
tracking_column => "userid"
}
}
filter {
ruby {
code => "
require 'json'
begin
point_json = JSON.parse(event.get('location'))
data_json = JSON.parse(event.get('additionaldata').to_s || {})
event.set('lon', point_json['coordinates'][0])
event.set('lat', point_json['coordinates'][1])
event.set('data', data_json)
rescue Exception => e
event.tag('invalide boundaries json')
end
"
}
}
filter {
mutate {
rename => {
"data" => "additionalData"
}
replace => {
"location" => "POINT(%{lon} %{lat})"
}
rename => {
"userid" => "userId"
}
rename => {
"firstname" => "firstName"
}
rename => {
"lastname" => "lastName"
}
rename => {
"dateofbirth" => "dateOfBirth"
}
rename => {
"createdat" => "createdAt"
}
rename => {
"updatedat" => "updatedAt"
}
remove_field => ["lat", "lon", "@version", "@timestamp", "additionaldata", "data"]
}
}
output {
stdout {
codec => json_lines
}
elasticsearch {
hosts => ["localhost:9200"]
index => "users_test"
document_id => "%{userId}"
#point => POINT("%{lon}" "%{lat}")
}
}
这是日志文件。我在日志文件中截断了数据日志。
Using JAVA_HOME defined java: /Library/Java/JavaVirtualMachines/jdk-11.0.9.jdk/Contents/Home
WARNING, using JAVA_HOME while Logstash distribution comes with a bundled JDK
Sending Logstash logs to /Users/ereshgorantla/Documents/Dev/logstash-7.13.0/logs which is now configured via log4j2.properties
[2021-06-21T18:03:54,197][INFO ][logstash.runner ] Log4j configuration path used is: /Users/ereshgorantla/Documents/Dev/logstash-7.13.0/config/log4j2.properties
[2021-06-21T18:03:54,205][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"7.13.0", "jruby.version"=>"jruby 9.2.16.0 (2.5.7) 2021-03-03 f82228dc32 Java HotSpot(TM) 64-Bit Server VM 11.0.9+7-LTS on 11.0.9+7-LTS +indy +jit [darwin-x86_64]"}
[2021-06-21T18:03:54,262][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2021-06-21T18:03:54,726][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2021-06-21T18:03:55,251][INFO ][org.reflections.Reflections] Reflections took 33 ms to scan 1 urls, producing 24 keys and 48 values
[2021-06-21T18:03:56,014][INFO ][logstash.outputs.elasticsearch][main] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//localhost:9200"]}
[2021-06-21T18:03:56,239][INFO ][logstash.outputs.elasticsearch][main] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://localhost:9200/]}}
[2021-06-21T18:03:56,362][WARN ][logstash.outputs.elasticsearch][main] Restored connection to ES instance {:url=>"http://localhost:9200/"}
[2021-06-21T18:03:56,400][INFO ][logstash.outputs.elasticsearch][main] Elasticsearch version determined (7.12.1) {:es_version=>7}
[2021-06-21T18:03:56,401][WARN ][logstash.outputs.elasticsearch][main] Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>7}
[2021-06-21T18:03:56,494][INFO ][logstash.outputs.elasticsearch][main] Using a default mapping template {:es_version=>7, :ecs_compatibility=>:disabled}
[2021-06-21T18:03:56,521][INFO ][logstash.javapipeline ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>12, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>1500, "pipeline.sources"=>["/Users/ereshgorantla/Documents/Dev/logstash-7.13.0/bin/users_logstash.conf"], :thread=>"#<Thread:0xfccb4f5 run>"}
[2021-06-21T18:03:57,276][INFO ][logstash.javapipeline ][main] Pipeline Java execution initialization time {"seconds"=>0.75}
[2021-06-21T18:03:57,375][INFO ][logstash.javapipeline ][main] Pipeline started {"pipeline.id"=>"main"}
[2021-06-21T18:03:57,420][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2021-06-21T18:03:57,982][INFO ][logstash.inputs.jdbc ][main][eed2191982f2fcf3eb078f1532ac1dbad93c3ff95e53bd7295f3d8f9a20e8ae9] (0.012941s) SELECT CAST(current_setting('server_version_num') AS integer) AS v
[2021-06-21T18:03:58,084][INFO ][logstash.inputs.jdbc ][main][eed2191982f2fcf3eb078f1532ac1dbad93c3ff95e53bd7295f3d8f9a20e8ae9] (0.002953s) SELECT count(*) AS "count" FROM (select user_id as userId, first_name as firstName, last_name as lastName, date_of_birth as dateOfBirth, city as city, country as country, st_asgeojson(location) as location, created_at as createdAt, updated_at as updatedAt, additional_data::text as additionalData from users) AS "t1" LIMIT 1
[2021-06-21T18:03:58,136][INFO ][logstash.inputs.jdbc ][main][eed2191982f2fcf3eb078f1532ac1dbad93c3ff95e53bd7295f3d8f9a20e8ae9] (0.032835s) SELECT * FROM (select user_id as userId, first_name as firstName, last_name as lastName, date_of_birth as dateOfBirth, city as city, country as country, st_asgeojson(location) as location, created_at as createdAt, updated_at as updatedAt, additional_data::text as additionalData from users) AS "t1" LIMIT 100000 OFFSET 0
{"country":"India","location":"POINT(55.470544 -101.12345)","city":"Hyderabad","updatedAt":"2021-06-17T11:44:08.954Z","createdAt":"2021-06-17T11:30:39.423Z","lastName":"aed6f9491c","additionalData":{"hadKids":true,"status":"Married"},"firstName":"6bbcbd05c5","dateOfBirth":"1979-06-16T18:30:00.000Z","userId":"3d2212ac-7347-482d-98a8-81301f817e65"}
{"country":"India","location":"POINT(55.470544 -101.12345)","city":"Hyderabad","updatedAt":"2021-06-17T11:44:08.954Z","createdAt":"2021-06-17T11:30:39.423Z","lastName":"337360cb29","additionalData":{"hadKids":true,"status":"Married"},"firstName":"eb552550a4","dateOfBirth":"1981-06-16T18:30:00.000Z","userId":"d34785f2-debb-47bc-ac5c-cd6fb14817ed"}
-------
[2021-06-21T18:03:59,463][INFO ][logstash.javapipeline ][main] Pipeline terminated {"pipeline.id"=>"main"}
[2021-06-21T18:03:59,964][INFO ][logstash.pipelinesregistry] Removed pipeline from registry successfully {:pipeline_id=>:main}
[2021-06-21T18:03:59,989][INFO ][logstash.runner ] Logstash shut down.