clickhouse与kafka集成

clickhouse支持与多种存储引擎集成,可以从集成的引擎里面读取消息,然后写到真正的数据存储表里。

clickhouse批量写入的性能比较好,我们的业务场景下会大批量的产生数据,如果使用clickhouse-jdbc去写的,写入时机和每批次写入的数量不好把控,最终选择了先将消息写入kafka,然后由clickhouse从kafka消费数据,clickhouse server消费到数据之后写入真正的数据表。

clickhouse集成kafka引擎见官方文档:
https://clickhouse.com/docs/zh/engines/table-engines/integrations/kafka/

下面的介绍会与官方文档有重复,然后补充一些集成过程中遇到的坑。

下面介绍clickhouse与kafka集成的步骤,clickhouse版本是22.1.3.7

集成kafka

参数解释

必要参数

  • kafka_broker_list – 以逗号分隔的 brokers 列表 (localhost:9092)。
  • kafka_topic_list – topic 列表 (my_topic)。
  • kafka_group_name – Kafka 消费组名称 (group1)。如果不希望消息在集群中重复,请在每个分片中使用相同的组名。
  • kafka_format – 消息体格式。使用与 SQL 部分的 FORMAT 函数相同表示方法,例如 JSONEachRow。了解详细信息,请参考 Formats 部分。

可选参数

  • kafka_row_delimiter - 每个消息体(记录)之间的分隔符。

  • kafka_schema – 如果解析格式需要一个 schema 时,此参数必填。例如,普罗托船长 需要 schema 文件路径以及根对象 schema.capnp:Message 的名字。

  • kafka_num_consumers – 单个表的消费者数量。默认值是:1,如果一个消费者的吞吐量不足,则指定更多的消费者。消费者的总数不应该超过 topic 中分区的数量,因为每个分区只能分配一个消费者。

关于必选参数中的kafka_format参数,参见Formats部分,format具体解释如下
https://clickhouse.com/docs/zh/interfaces/formats/

JSONEachRow, JSONStringsEachRow, JSONCompactEachRow, JSONCompactStringsEachRow
这几种格式,ClickHouse会将行输出为用换行符分隔的JSON值,这些输出数据作为一个整体时,由于没有分隔符(,)因而不是有效的JSON文档。
官方文档给了一些示例。

{"some_int":42,"some_str":"hello","some_tuple":[1,"a"]} // JSONEachRow
[42,"hello",[1,"a"]] // JSONCompactEachRow
["42","hello","(2,'a')"] // JSONCompactStringsEachRow

由于我的真实的数据表,有一个字段是json类型的字符串,但是一开始设置kafka_format的类型为JSONEachRow时,从kafka消费数据会报错,所以kafka_format格式设置成了JSONAsString,具体的错误后面贴出来。

创建引擎表

创建kafka引擎表,用于从kafka消费数据

CREATE TABLE msg_json_source (
    msg String
) ENGINE = Kafka
SETTINGS kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'topic_test',
kafka_group_name = 'topic_test_consumer',
kafka_format = 'JSONAsString'

由于我的数据结构里有嵌套json,如果使用JSONEachRow,有个字段是json类型的字符串,带转义字符,导致clickhouse解析失败,没找到解决办法,所以使用了JSONAsString格式。

创建真实数据表

CREATE TABLE msg_target
(
  biz Nullable(String),
  sender_id String,
  msg_id UInt64,
  status String,
  status_time UInt64,
  content String,
  event_time DateTime Default now()
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_time)
ORDER BY (msg_id, status_time)
TTL event_time + INTERVAl 1 YEAR

一个简单的MergeTree引擎的表,其中content是json格式的字符串。

创建物化视图

CREATE MATERIALIZED VIEW msg_json_source_consumer TO msg_target
AS SELECT
 JSONExtractString(msg,'biz') as biz,
 JSONExtractString(msg,'sender_id') as sender_id,
 JSONExtractUInt(msg,'msg_id') as msg_id,
 JSONExtractString(msg,'status') as status,
 JSONExtractUInt(msg,'status_time') as status_time,
 JSONExtractString(msg,'content') as content
FROM msg_json_source

创建的物化视图用于把从kafka消费到的数据,写到真实的数据表里,在这个例子里,msg_json_source从kafka消费到数据,然后通过物化视图msg_json_source_consumer将消费到的数据写到真实的数据表msg_target中。

由于从kafka消费到的数据就是一个json字符串,在这里使用JSONExtractString等json字段提取工具,提取msg里的字段,比如biz,sender_id,content等字段。

status_time原本计划用DatTime64类型的,但是这个时间格式有坑,最终选择了使用UInt64存毫秒级时间戳,具体的问题下面再介绍。

往kafka写消息

在clickhouse创建好3张表之后(kafka引擎表,真实数据表,物化视图表),往kafka发消息
本地安装一个简易的kafka服务端,然后创建topic

/opt/dev/confluent-7.0.1/bin/ 

./kafka-topics --create --topic topic_test --bootstrap-server localhost:9092
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic topic_test.

创建好topic之后,使用Java客户端往kafka发消息,使用confluent client发也可以。
添加kafka依赖

 <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.1.0</version>
        </dependency>

实体类,使用fastjson的@JSONField注解,实体类转字符串的时候,将驼峰转换为下划线

import com.alibaba.fastjson.annotation.JSONField;

public class MsgJsonSource {
    private String biz;

    @JSONField(name = "sender_id")
    private String senderId;

    @JSONField(name = "msg_id")
    private Long msgId;

    private String status;

    @JSONField(name = "status_time")
    private Long statusTime;

    private Map<String,Object> content;

    //省略getter/setter
}

测试类

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.fc.model.MsgJsonSource;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Future;

public class KafkaSendTest {
    public static void main(String[] args) {

        // 1. 创建 kafka 生产者的配置对象
        Properties properties = new Properties();
        // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        // 3. 创建 kafka 生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        System.out.println("开始发送数据");
        // 4. 调用 send 方法,发送消息
        String topic = "topic_test";
        for (int i = 0; i < 10; i++) {
            
            msgJsonSource.setContent(content);
            
            Future<RecordMetadata> recordMetadataFuture = kafkaProducer.send(new ProducerRecord<>(topic, JSON.toJSONString(msgJsonSource)));
            System.out.println("当前消息序号: " + i + ", 发送结果: " + JSON.toJSONString(recordMetadataFuture));
        }
        // 5. 关闭资源
        kafkaProducer.close();
    }
}

最终发送完,我们查看一下clickhouse里的数据表的数据,可以发现我们发送到kakfa里的数据,已经成功的消费,并且写入到真实的数据表里了。

image.png

遇到的问题

版本问题

当时测试环境部署的版本是21.9,但是这个版本有问题,不推荐安装,建议直接部署22以上的clickhouse

JSONEachRow

我一开始就是使用的JSONEachRow格式,但是我的消息体里还有嵌套的json,类似下面这种格式,里面有个字段还是个json,转行成字符串带转义字符。
然后消息体的string字符串贴一条在这里

{"biz":"biz","content":"{\"current\":1648817159914,\"abc\":\"111\",\"text\":\"gggg\",\"req\":{\"nested\":0}}","msg_id":7561248342312669573,"sender_id":"test_0","status":"status_0","status_time":1648817159914}

然后clickhouse解析消息体报错,当时的错找不到了,现在复现不出来了,非常的难顶。。。。
后来因为赶版本的原因把kafka_format换成了JSONAsString。

时间格式问题

clickhouse是支持DateTime64格式的,可以到毫秒级,但是实际使用过程中却有些坑在,

首先是有的客户端解析毫秒字符串有问题,其次是使用JSONExtract*的方法,会有差异,再然后是jdbc查询的时候,也会导致时间查询有问题。
拿毫秒时间戳和秒级时间戳做试验,clickhouse-server版本是22.3.1.1

把上面的kafka引擎表拿出来改一下

#kafka引擎
CREATE TABLE msg_json_source_004 (
    msg String
) ENGINE = Kafka
SETTINGS kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'topic_test_004',
kafka_group_name = 'topic_test_004_consumer',
kafka_format = 'JSONAsString'

#真实数据表
CREATE TABLE msg_target_004
(
  biz Nullable(String),
  sender_id String,
  msg_id UInt64,
  status String,
  status_time DateTime64(3, 'Asia/Shanghai'),
  content String,
  event_time DateTime Default now()
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_time)
ORDER BY (msg_id, status_time)
TTL event_time + INTERVAl 1 YEAR

#物化视图
CREATE MATERIALIZED VIEW msg_json_source_004_consumer TO msg_target_004
AS SELECT
 JSONExtractString(msg,'biz') as biz,
 JSONExtractString(msg,'sender_id') as sender_id,
 JSONExtractUInt(msg,'msg_id') as msg_id,
 JSONExtractString(msg,'status') as status,
 JSONExtractUInt(msg,'status_time') as status_time,
 JSONExtractString(msg,'content') as content
FROM msg_json_source_004

其中status_time这个字段的类型改成DateTime64(3, 'Asia/Shanghai'),使用JSONExtractUInt提取时间,看下效果

首先发条数据,数据内容如下

{"biz":"biz","content":"1111","msg_id":7253917707485514799,"sender_id":"test_0","status":"status_0","status_time":1648901022355}

传入的是毫秒级时间戳,然后数据表存储的时候就变成了2282年


image.png

然后如果传入秒级的时间戳,真实的数据是这样

{"biz":"biz_3","content":"00003","msg_id":3053111163555706035,"sender_id":"test_3","status":"status_3","status_time":1648901508}

clickhouse存储的时候看着时间正常了,但是毫秒丢失了


image.png

然后修改一下物化视图的字段提取方式,之前是 JSONExtractUInt(msg,'status_time') as status_time,现在改成使用 JSONExtractString(msg,'status_time') as status_time提取时间
会发现时间类型又正常了。

这一条数据内容如下

{"biz":"biz_1","content":"2222","msg_id":9151431369051819265,"sender_id":"test_1","status":"status_1","status_time":1648901194855}

最终使用JSONExtractString提取毫秒时间戳,得到了正确的DateTime64的时间,非常的神奇


image.png

最终我决定来了个釜底抽薪的方法,时间直接用UInt64存,因为我发送出去的数据是毫秒级时间戳,最终存时间戳,查询时间范围的时候直接用long类型的数据between好了。

这也是无奈之举,万一哪天server更新版本,导致时间出现问题,那就完蛋了,希望后面时间可以稳定一点吧。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,319评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,801评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,567评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,156评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,019评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,090评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,500评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,192评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,474评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,566评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,338评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,212评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,572评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,890评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,169评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,478评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,661评论 2 335

推荐阅读更多精彩内容