Pulsar 2.5.0 之Java client

Pulsar 2.5.0 之Java client

官网原文标题《Pulsar Java client》

翻译时间:2020-02-14

官网原文地址:http://pulsar.apache.org/docs/en/client-libraries-java/

译者:本文介绍如何使用javaClient创建生产者、消费者以及通过管理后台接口读取消息。


Pulsar Java client

通过Java client 可以创建生产者、消费者以及读取消息,当前API版本为2.5.0,包括两大块内容

描述 Maven Artifact
org.apache.pulsar.client.api 创建生产和创建消息者 API org.apache.pulsar:pulsar-client:2.5.0
org.apache.pulsar.client.admin admin API org.apache.pulsar:pulsar-client-admin:2.5.0

本章重点是创建生产和创建消息者 API如何使用,关于 admin client API 仔细阅读文档 Pulsar admin interface

Java Client 导入包方式

Maven

在pom.xml 文件添加如下

<!-- in your <properties> block -->
<pulsar.version>2.5.0</pulsar.version>

<!-- in your <dependencies> block -->
<dependency>
  <groupId>org.apache.pulsar</groupId>
  <artifactId>pulsar-client</artifactId>
  <version>${pulsar.version}</version>
</dependency>

Gradle

build.gradle 文件添加如下信息

def pulsarVersion = '2.5.0'

dependencies {

compile group: 'org.apache.pulsar', name: 'pulsar-client', version: pulsarVersion

}

Java Client URLS

客户端client 通过pulsar协议来进行通讯,类型列表如下

如何创建 PulsarClient 对象

默认示例:

PulsarClient client = PulsarClient.builder()

    .serviceUrl("pulsar://localhost:6650")

    .build();

多brokers示例

PulsarClient client = PulsarClient.builder()

    .serviceUrl("pulsar://localhost:6650,localhost:6651,localhost:6652")

   .build();

本地集群模式 默认 broker URL pulsar://localhost:6650

通过loadConf 可以进行自定义 PulsarClient 参数

参数配置

参数类型 参数名称 描述 默认值
String serviceUrl
String authPluginClassName
String authParams
long operationTimeoutMs 30000
long statsIntervalSeconds 60
int numIoThreads 1
int numListenerThreads 1
boolean useTcpNoDelay true
boolean useTls false
string tlsTrustCertsFilePath
boolean tlsAllowInsecureConnection false
boolean tlsHostnameVerificationEnable false
int concurrentLookupRequest 5000
int maxLookupRequest 50000
int maxNumberOfRejectedRequestPerConnection 50
int keepAliveIntervalSeconds 30
int connectionTimeoutMs 10000
int requestTimeoutMs 60000
int defaultBackoffIntervalNanos TimeUnit.MILLISECONDS.toNanos(100);
long maxBackoffIntervalNanos TimeUnit.SECONDS.toNanos(30)

Producer 对象创建与使用

发送消息,需要创建Producer对象对指定的topic发送消息

PulsarClient client = PulsarClient.builder()
    .serviceUrl("pulsar://localhost:6650")
    .build();
Producer<byte[]> producer = client.newProducer()
    .topic("my-topic")
   .create();
*// You can then send messages to the broker and topic you specified:*
producer.send("My message".getBytes());

默认情况下 messages schema. 是字节数组,schema是可以根据自己的业务场景进行选择的。例如我们可以使用string 类型schema。

Producer<String> stringProducer = client.newProducer(Schema.STRING)
    .topic("my-topic")
    .create();
stringProducer.send("My message");

对象在使用完之后需要进行释放

producer.close();
consumer.close();
client.close();

关闭释放操作也有异步方法

producer.closeAsync()
  .thenRun(() -> System.out.println("Producer closed"))
  .exceptionally((ex) -> {
   System.err.println("Failed to close producer: " + ex);
    return null;
  });

Producer 对象可以使用默认配置,也可以自定义配置参数,自定义配置是通过loadConf 来进行配置。

参数类型 参数名称 描述 默认值
String topicName null
String producerName null
long sendTimeoutMs 30000
boolean blockIfQueueFull false
int maxPendingMessages 1000
int maxPendingMessagesAcrossPartitions 50000
MessageRoutingMode messageRoutingMode pulsar.RoundRobinDistribution
HashingScheme hashingScheme HashingScheme.JavaStringHash
ProducerCryptoFailureAction cryptoFailureAction ProducerCryptoFailureAction.FAIL
long batchingMaxPublishDelayMicros TimeUnit.MILLISECONDS.toMicros(1)
int batchingMaxMessages 1000
boolean batchingEnabled true
CompressionType compressionType No compression

Producer自定义配置示例:

Producer<byte[]> producer = client.newProducer()
  .topic("my-topic")
  .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
  .sendTimeout(10, TimeUnit.SECONDS)
  .blockIfQueueFull(true)
  .create();

如果Producer 创建了分片topic,发送消息时需要消息路由模式发送,消息路由了解更多,请阅读 Partitioned Topics cookbook.

异步发送消息,返回MessageId包装对象

producer.sendAsync("my-async-message".getBytes()).thenAccept(msgId -> {
  System.out.printf("Message with ID %s successfully sent", msgId);
});

Message 对象自定义配置示例:

producer.newMessage()
  .key("my-message-key")
  .value("my-async-message".getBytes())
  .property("my-key", "my-value")
  .property("my-other-key", "my-other-value")
  .send();

还可以使用sendAsync()方法,这个方法有返回值


Consumer

通过PulsarClient 对象来创建Consumer对象,Consumer对象在创建的时候需要指定主题与订阅名称。对象创建完成,就可以来对订阅的主题进行消费

示例:

Consumer consumer = client.newConsumer()
    .topic("my-topic")
   .subscriptionName("my-subscription")
    .subscribe();

通过while 循环来监听与接收内容,如果接收失败,可以通过negativeAcknowledge方法,重新投递消息

while (true) {
 // Wait for a message
 Message msg = consumer.receive();
 try {
   // Do something with the message
   System.out.printf("Message received: %s", new String(msg.getData()));
   // Acknowledge the message so that it can be deleted by the message broker
  consumer.acknowledge(msg);
 } catch (Exception e) {
   // Message failed to process, redeliver later
   consumer.negativeAcknowledge(msg);
 }
}

可以使用默认配置,也可以通过loadConf 进行自定义配置

参数类型 参数名称 描述 默认值
Set<String> topicNames Sets.newTreeSet()
Pattern topicsPattern None
String subscriptionName None
SubscriptionType subscriptionType SubscriptionType.Exclusive
int receiverQueueSize 1000
long acknowledgementsGroupTimeMicros TimeUnit.MILLISECONDS.toMicros(100)
long negativeAckRedeliveryDelayMicros TimeUnit.MINUTES.toMicros(1)
int maxTotalReceiverQueueSizeAcrossPartitions 50000
String consumerName null
long ackTimeoutMillis 0
long tickDurationMillis 1000
int priorityLevel 0
ConsumerCryptoFailureAction cryptoFailureAction ConsumerCryptoFailureAction.FAIL
SortedMap<String, String> properties new TreeMap<>()
boolean readCompacted false
SubscriptionInitialPosition subscriptionInitialPosition SubscriptionInitialPosition.Latest
int patternAutoDiscoveryPeriod 1
RegexSubscriptionMode regexSubscriptionMode RegexSubscriptionMode.PersistentOnly
DeadLetterPolicy deadLetterPolicy None
boolean autoUpdatePartitions true
boolean replicateSubscriptionState false

示例

Consumer consumer = client.newConsumer()
    .topic("my-topic")
    .subscriptionName("my-subscription")
    .ackTimeout(10, TimeUnit.SECONDS)
    .subscriptionType(SubscriptionType.Exclusive)
    .subscribe();

Consumer接收方式

  • 异步接收

    CompletableFuture<Message> asyncMessage = consumer.receiveAsync();

  • 批量接收

    Messages messages = consumer.batchReceive();
    for (message in messages) {
     // do something
    }
    consumer.acknowledge(messages)
    

自定义批量接收策略

Consumer consumer = client.newConsumer()
        .topic("my-topic")
        .subscriptionName("my-subscription")
        .batchReceivePolicy(BatchReceivePolicy.builder()
        .maxNumMessages(100)
        .maxNumBytes(1024 * 1024)
        .timeout(200, TimeUnit.MILLISECONDS)
        .build())
        .subscribe();

默认批量接收策略

BatchReceivePolicy.builder()
  .maxNumMessage(-1)
  .maxNumBytes(10 * 1024 * 1024)
  .timeout(100, TimeUnit.MILLISECONDS)
.build();

多主题订阅

当consumer订阅pulsar的主题,默认情况下,它订阅了一个指定的主题,例如:persistent://public/default/my-topic。从Pulsar的1.23.0-incubating的版本,Pulsar消费者可以同时订阅多个topic。你可以用以下两种方式定义topic的列表:

通过正则订阅多主题时,所有的主题必须在同一个命名空间(namespace)

当订阅多主题时,pulsar客户端会自动调用Pulsar的API来发现匹配表达式的所有topic,然后全部订阅。如果此时有暂不存在的topic,那么一旦这些topic被创建,conusmer会自动订阅。

不能保证顺序性

当消费者订阅多主题时,pulsar所提供对单一主题订阅的顺序保证,就hold不住了。如果你在使用pulsar的时候,遇到必须保证顺序的需求,我们强烈建议不要使用此特性

正则表达式订阅

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;

ConsumerBuilder consumerBuilder = pulsarClient.newConsumer()
   .subscriptionName(subscription);
// Subscribe to all topics in a namespace
Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default/.*");
Consumer allTopicsConsumer = consumerBuilder
   .topicsPattern(allTopicsInNamespace)
  .subscribe();
// Subscribe to a subsets of topics in a namespace, based on regex
Pattern someTopicsInNamespace = Pattern.compile("persistent://public/default/foo.*");
Consumer allTopicsConsumer = consumerBuilder
   .topicsPattern(someTopicsInNamespace)
   .subscribe();

列表订阅

List<String> topics = Arrays.asList(
    "topic-1",
   "topic-2",
    "topic-3"
);
Consumer multiTopicConsumer = consumerBuilder
   .topics(topics)
   .subscribe();
*// Alternatively:*
Consumer multiTopicConsumer = consumerBuilder
  .topics(
     "topic-1",
     "topic-2",
     "topic-3"
   )
  .subscribe();

异步订阅

Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default.*");
consumerBuilder
  .topics(topics)
   .subscribeAsync()
   .thenAccept(this::receiveMessageFromConsumer);
private void receiveMessageFromConsumer(Consumer consumer) {
  consumer.receiveAsync().thenAccept(message -> {
     // Do something with the received message
       receiveMessageFromConsumer(consumer);
});
}

订阅模式

Pulsar 提供了多样化的订阅模式来满足实际中的业务场景。

为了能更好的理解订阅多样化的模式,我们通过创建一个生产者,往指定主题名称为my-topic的主题发送10条消息,来进行分析对比。

发送示例:

Producer<String> producer = client.newProducer(Schema.STRING)

   .topic("my-topic")

   .enableBatching(false)

   .create();

// 3 messages with "key-1", 3 messages with "key-2", 2 messages with "key-3" and 2 messages with "key-4"

producer.newMessage().key("key-1").value("message-1-1").send();

producer.newMessage().key("key-1").value("message-1-2").send();

producer.newMessage().key("key-1").value("message-1-3").send();

producer.newMessage().key("key-2").value("message-2-1").send();

producer.newMessage().key("key-2").value("message-2-2").send();

producer.newMessage().key("key-2").value("message-2-3").send();

producer.newMessage().key("key-3").value("message-3-1").send();

producer.newMessage().key("key-3").value("message-3-2").send();

producer.newMessage().key("key-4").value("message-4-1").send();

producer.newMessage().key("key-4").value("message-4-2").send();

Exclusive订阅模式

独占模式,只能有一个消费者绑定到订阅(subscription)上。如果多于一个消费者尝试以同样方式去订阅主题,消费者将会收到错误。

Consumer consumer = client.newConsumer()
    .topic("my-topic")
    .subscriptionName("my-subscription")
    .subscriptionType(SubscriptionType.Exclusive)
    .subscribe()

Failover订阅模式

灾备模式中,多个consumer可以绑定到同一个subscription。consumer将会按字典顺序排序,第一个consumer被初始化为唯一接受消息的消费者。这个consumer被称为master consumer。当master consumer断开时,所有的消息(未被确认和后续进入的)将会被分发给队列中的下一个consumer。

Consumer consumer1 = client.newConsumer()
    .topic("my-topic")
    .subscriptionName("my-subscription")
   .subscriptionType(SubscriptionType.Failover)
   .subscribe()
Consumer consumer2 = client.newConsumer()
    .topic("my-topic")
   .subscriptionName("my-subscription")
    .subscriptionType(SubscriptionType.Failover)
.subscribe()

运行结果

//conumser1 is the active consumer, consumer2 is the standby consumer.

//consumer1 receives 5 messages and then crashes, consumer2 takes over as an active consumer.

Shared 订阅模式

shared或者round robin模式中,多个消费者可以绑定到同一个订阅上。消息通过round robin轮询机制分发给不同的消费者,并且每个消息仅会被分发给一个消费者。当消费者断开连接,所有被发送给他,但没有被确认的消息将被重新安排,分发给其它存活的消费者。

Consumer consumer1 = client.newConsumer()
   .topic("my-topic")
   .subscriptionName("my-subscription")
    .subscriptionType(SubscriptionType.Shared)
    .subscribe()
Consumer consumer2 = client.newConsumer()
   .topic("my-topic")
   .subscriptionName("my-subscription")
   .subscriptionType(SubscriptionType.Shared)
    .subscribe()
//Both consumer1 and consumer 2 is active consumers.

运行结果

消费者1

("key-1", "message-1-1")

("key-1", "message-1-3")

("key-2", "message-2-2")

("key-3", "message-3-1")

("key-4", "message-4-1")

消费者2

("key-1", "message-1-2")

("key-2", "message-2-1")

("key-2", "message-2-3")

("key-3", "message-3-2")

("key-4", "message-4-2")

Key_Shared订阅模式

2.4.0 版本之后新扩展的订阅模式

Consumer consumer1 = client.newConsumer()
     .topic("my-topic")
     .subscriptionName("my-subscription")
    .subscriptionType(SubscriptionType.Key_Shared)
     .subscribe()
Consumer consumer2 = client.newConsumer()
    .topic("my-topic")
    .subscriptionName("my-subscription")
     .subscriptionType(SubscriptionType.Key_Shared)
     .subscribe()
//Both consumer1 and consumer2 are active consumers.

运行结果

消费者1

("key-1", "message-1-1")
("key-1", "message-1-2")
("key-1", "message-1-3")
("key-3", "message-3-1")
("key-3", "message-3-2")
 

消费者2

("key-2", "message-2-1")
("key-2", "message-2-2")
("key-2", "message-2-3")
("key-4", "message-4-1")
("key-4", "message-4-2")
 

Reader

使用Pulsar的 读取器接口, 应用程序可以手动管理游标。 当使用读取器连接到一个主题而非消费者时,在读取器连接到主题的时候就需要指定读取器从哪个位置开始读消息。当连接到某个主题时, 读取器从以下位置开始读消息:

  • ​ 主题中最早的可用消息
  • ​ 主题中最新可用消息
  • ​ 指定的消息ID

示例:

ReaderConfiguration conf = new ReaderConfiguration();
     byte[] msgIdBytes = *// Some message ID byte array*
      MessageId id = MessageId.fromByteArray(msgIdBytes);
   Reader reader = pulsarClient.newReader()
         .topic(topic)
         .startMessageId(id)
           .create();
     
      while (true) {
         Message message = reader.readNext();
        *// Process message*
      }

 

Reader 配置

Reader 配置

参数类型 参数名称 描述 默认值
String topicName None
int receiverQueueSize 1000
ReaderListener<T> readerListener None
String readerName null
String subscriptionRolePrefix null
CryptoKeyReader cryptoKeyReader null
ConsumerCryptoFailureAction cryptoFailureAction ConsumerCryptoFailureAction.FAIL
boolean readCompacted false
boolean resetIncludeHead false

reader范围读取策略

范围值区间0~65535,最大值不能超过65535

示例

pulsarClient.newReader()
     .topic(topic)
     .startMessageId(MessageId.earliest)
     .keyHashRange(Range.of(0, 10000), Range.of(20001, 30000))
     .create();

Schema

Pulsar应用中,如果开发者没有为topic指定schema,producer和consumer将会处理原始字节。但实际情况我们的期望不是这样的,我们期望能按自己的数据格式进行发送,我们需要支持不同的数据类型。 Message schemas 就能很好的支持这种业务场景。

Schema示例

public class SensorReading {
  public float temperature;
  public SensorReading(float temperature) {
    this.temperature = temperature;
  }
  // A no-arg constructor is required
  public SensorReading() {

  }
  public float getTemperature() {
   return temperature;
  }
  public void setTemperature(float temperature) {
    this.temperature = temperature;
  }

}
Producer<SensorReading> producer = client.newProducer(JSONSchema.of(SensorReading.class))
    .topic("sensor-readings")
    .create();

目前java client支持shcema 如下

  • Schema.BYTES
Producer<byte[]> bytesProducer = client.newProducer(Schema.BYTES)

   .topic("some-raw-bytes-topic")

   .create();

//or

Producer<byte[]> bytesProducer = client.newProducer()

   .topic("some-raw-bytes-topic")

   .create();
  • Schema.STRING
Producer<String> stringProducer = client.newProducer(Schema.STRING)
   .topic("some-string-topic")
   .create();

 
  • Schema.JSON
Producer<MyPojo> pojoProducer = client.newProducer(Schema.JSON(MyPojo.class))

   .topic("some-pojo-topic")

   .create();
  • Schema.PROTOBUF
Producer<MyProtobuf> protobufProducer = client.newProducer(Schema.PROTOBUF(MyProtobuf.class))

   .topic("some-protobuf-topic")

   .create(); 
  • Schema.AVRO
Producer<MyAvro> avroProducer = client.newProducer(Schema.AVRO(MyAvro.class))

   .topic("some-avro-topic")

   .create();

身份验证与认证

Pulsar支持两种方式

  • TLS
  • Athenz

TLS 身份认证使用需要将setUseTls设置为true ,同时需要设置TLS cert 路径

示例

Map<String, String> authParams = new HashMap<>();
authParams.put("tlsCertFile", "/path/to/client-cert.pem");
authParams.put("tlsKeyFile", "/path/to/client-key.pem");
Authentication tlsAuth = AuthenticationFactory
   .create(AuthenticationTls.class.getName(), authParams);
PulsarClient client = PulsarClient.builder()
   .serviceUrl("pulsar+ssl://my-broker.com:6651")
   .enableTls(true)
   .tlsTrustCertsFilePath("/path/to/cacert.pem")
    .authentication(tlsAuth)
   .build();

Athenz 需要设置TLS,同时需要初始化如下参数

  • tenantDomain
  • tenantService
  • providerDomain
  • privateKey

privateKey支持三种格式

  • file:///path/to/file
  • file:/path/to/file
  • data:application/x-pem-file;base64,<base64-encoded value>

示例

Map<String, String> authParams = new HashMap<>();
authParams.put("tenantDomain", "shopping"); // Tenant domain name
authParams.put("tenantService", "some_app"); // Tenant service name
authParams.put("providerDomain", "pulsar"); // Provider domain name
authParams.put("privateKey", "file:///path/to/private.pem"); // Tenant private key path
authParams.put("keyId", "v1"); // Key id for the tenant private key (optional, default: "0")
Authentication athenzAuth = AuthenticationFactory
   .create(AuthenticationAthenz.class.getName(), authParams);
PulsarClient client = PulsarClient.builder()
   .serviceUrl("pulsar+ssl://my-broker.com:6651")
   .enableTls(true)
   .tlsTrustCertsFilePath("/path/to/cacert.pem")
  .authentication(athenzAuth)
  .build();
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 201,049评论 5 473
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,478评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 148,109评论 0 333
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,097评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,115评论 5 363
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,280评论 1 279
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,748评论 3 393
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,398评论 0 255
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,553评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,440评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,487评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,176评论 3 317
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,750评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,821评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,049评论 1 257
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,559评论 2 348
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,150评论 2 341