golang如何使用sarama访问kafka

https://www.jianshu.com/p/3102418e5a7d

golang如何使用sarama访问kafka
golang连接kafka有三种client认证方式:

无认证
TLS认证
SASL/PLAIN认证, (其他SASL/SCRAM, SASL/GSSAPI都不支持)

下面一个客户端代码例子访问kafka服务器,来发送和接受消息。

使用方式


命令行参数

$ ./kafkaclient -h
Usage of ./client:
  -ca string
        CA Certificate (default "ca.pem")
  -cert string
        Client Certificate (default "cert.pem")
  -command string
        consumer|producer (default "consumer")
  -host string
        Common separated kafka hosts (default "localhost:9093")
  -key string
        Client Key (default "key.pem")
  -partition int
        Kafka topic partition
  -password string
        SASL password
  -sasl
        SASL enable
  -tls
        TLS enable
  -topic string
        Kafka topic (default "test--topic")
  -username string
        SASL username


作为producer启动

$ ./kafkaclient -command producer \
  -host kafka1:9092,kafka2:9092

## SASL/PLAIN enable
$ ./kafkaclient -command producer \
  -sasl -username kafkaclient -password kafkapassword \
  -host kafka1:9092,kafka2:9092

## TLS-enabled
$ ./kafkaclient -command producer \
  -tls -cert client.pem -key client.key -ca ca.pem \
  -host kafka1:9093,kafka2:9093

producer发送消息给kafka:
> aaa
2018/12/15 07:11:21 Produced message: [aaa]
> bbb
2018/12/15 07:11:30 Produced message: [bbb]
> quit


作为consumer启动

$ ./kafkaclient -command consumer \
  -host kafka1:9092,kafka2:9092

## SASL/PLAIN enabled
$ ./kafkaclient -command consumer \
  -sasl -username kafkaclient -password kafkapassword \
  -host kafka1:9092,kafka2:9092

## TLS-enabled
$ ./kafkaclient -command consumer \
  -tls -cert client.pem -key client.key -ca ca.pem \
  -host kafka1:9093,kafka2:9093

consumer从kafka接受消息:
2018/12/15 07:11:21 Consumed message: [aaa], offset: [4]
2018/12/15 07:11:30 Consumed message: [bbb], offset: [5]


完整源代码如下

这个代码使用到了Shopify/sarama库,请自行下载使用。
$ cat kafkaclient.go
package main

import (
    "flag"
    "fmt"
    "log"
    "os"
    "io/ioutil"
    "bufio"
    "strings"

    "crypto/tls"
    "crypto/x509"

    "github.com/Shopify/sarama"
)

var (
    command     string
    hosts       string
    topic       string
    partition   int
    saslEnable  bool
    username    string
    password    string
    tlsEnable   bool
    clientcert  string
    clientkey   string
    cacert      string
)

func main() {
    flag.StringVar(&command,    "command",      "consumer",         "consumer|producer")
    flag.StringVar(&hosts,      "host",         "localhost:9093",   "Common separated kafka hosts")
    flag.StringVar(&topic,      "topic",        "test--topic",      "Kafka topic")
    flag.IntVar(&partition,     "partition",    0,                  "Kafka topic partition")

    flag.BoolVar(&saslEnable,   "sasl",          false,             "SASL enable")
    flag.StringVar(&username,   "username",      "",                "SASL Username")
    flag.StringVar(&password,   "password",      "",                "SASL Password")

    flag.BoolVar(&tlsEnable,    "tls",          false,              "TLS enable")
    flag.StringVar(&clientcert, "cert",         "cert.pem",         "Client Certificate")
    flag.StringVar(&clientkey,  "key",          "key.pem",          "Client Key")
    flag.StringVar(&cacert,     "ca",           "ca.pem",           "CA Certificate")
    flag.Parse()

    config := sarama.NewConfig()
    if saslEnable {
        config.Net.SASL.Enable   = true
        config.Net.SASL.User     = username
        config.Net.SASL.Password = password
    }

    if tlsEnable {
        //sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
        tlsConfig, err := genTLSConfig(clientcert, clientkey, cacert)
        if err != nil {
            log.Fatal(err)
        }

        config.Net.TLS.Enable = true
        config.Net.TLS.Config = tlsConfig
    }

    client, err := sarama.NewClient(strings.Split(hosts, ","), config)
    if err != nil {
        log.Fatalf("unable to create kafka client: %q", err)
    }

    if command == "consumer" {
        consumer, err := sarama.NewConsumerFromClient(client)
        if err != nil {
            log.Fatal(err)
        }
        defer consumer.Close()
        loopConsumer(consumer, topic, partition)
    } else {
        producer, err := sarama.NewAsyncProducerFromClient(client)
        if err != nil {
            log.Fatal(err)
        }
        defer producer.Close()
        loopProducer(producer, topic, partition)
    }
}

func genTLSConfig(clientcertfile, clientkeyfile, cacertfile string) (*tls.Config, error) {
    // load client cert
    clientcert, err := tls.LoadX509KeyPair(clientcertfile, clientkeyfile)
    if err != nil {
        return nil, err
    }

    // load ca cert pool
    cacert, err := ioutil.ReadFile(cacertfile)
    if err != nil {
        return nil, err
    }
    cacertpool := x509.NewCertPool()
    cacertpool.AppendCertsFromPEM(cacert)

    // generate tlcconfig
    tlsConfig := tls.Config{}
    tlsConfig.RootCAs = cacertpool
    tlsConfig.Certificates = []tls.Certificate{clientcert}
    tlsConfig.BuildNameToCertificate()
 // tlsConfig.InsecureSkipVerify = true // This can be used on test server if domain does not match cert:
    return &tlsConfig, err
}

func loopProducer(producer sarama.AsyncProducer, topic string, partition int) {
    scanner := bufio.NewScanner(os.Stdin)
    fmt.Print("> ")
    for scanner.Scan() {
        text := scanner.Text()
        if text == "" {
        } else if text == "exit" || text == "quit" {
            break
        } else {
            producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)}
            log.Printf("Produced message: [%s]\n",text)
        }
        fmt.Print("> ")
    }
}

func loopConsumer(consumer sarama.Consumer, topic string, partition int) {
    partitionConsumer, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
    if err != nil {
        log.Println(err)
        return
    }
    defer partitionConsumer.Close()

    for {
        msg := <-partitionConsumer.Messages()
        log.Printf("Consumed message: [%s], offset: [%d]\n", msg.Value, msg.Offset)
    }
}

编译:
$ go build kafkaclient.go
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,547评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,399评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,428评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,599评论 1 274
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,612评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,577评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,941评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,603评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,852评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,605评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,693评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,375评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,955评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,936评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,172评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,970评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,414评论 2 342

推荐阅读更多精彩内容