Pulsar-安装部署

Pulsar的起源

Pulsar是由雅虎创建的开源的、分布式pub-sub系统,现在是Apache基金会的一个孵化项目。

Pulsar的关键特性

Pulsar的关键特性如下表所示:

关键特性 描述
Pulsar函数 使用对开发人员友好的API,可以轻松部署轻量级计算逻辑,无需运行自己的流处理引擎。
生产环境已证明 Pulsar已经在雅虎规模的生产环境中运行了3年多,每秒有数百万条消息涉及数百万个主题。
水平扩展 Pulsar集群支持无缝水平扩展到数百个节点。
低延迟、支持持久存储 Pulsar设计用于大规模的低延迟发布(<5ms),具有强大的可用性保证。
跨域复制 专为跨多个地理区域的数据中心之间的配置数据复制而设计。
多租户 原生支持多租户,支持租户间的隔离,身份验证,授权和配额管理。
持久存储 基于Apache BookKeeper的持久消息存储。支持读写之间的IO隔离。
丰富的客户端 Pulsar使用灵活的消息传递模型,支持Java,C ++,Python和Go。
可操作性 提供用于配置,管理,工具和监视的管理API,支持部署在裸机或Kubernetes上。

Pulsar集群搭建

温馨提示

  1. 单集群的Pulsar实例可以满足绝大多数学习、开发、验证需求,如果没有特殊的需要,建议使用单集群Pulsar实例,多集群的安装部署参考多集群部署
  2. 在部署Pulsar过程中,如需要使用所有内置的Pulsar IO 连接器,需要下载apache-pulsar-io-connectors,并安装到Pulsar的connectors目录下。

Pulsar集群的部署步骤如下:

  • 部署ZooKeeper集群(可选)
  • 初始化集群元信息
  • 部署BookKeeper集群
  • 部署一个或多个Pulsar broker

安装前准备

如果你已经有一个ZooKeeper集群,并且愿意重用该集群,则无需准备安装ZooKeeper集群的资源,也无需部署ZooKeeper集群

  1. 至少6台Linux机器或虚拟机
  • 1-1 三台机器用于部署ZooKeeper集群,Pulsar仅会定期使用ZooKeeper进行协调和配置任务,业务操作不依赖ZooKeeper集群,部署时可以使用性能规格较低的机器。
  • 1-2 三台部署BookKeeper集群和Pulsar broker。Puslar集群实际承载业务,建议使用性能规格更高的机器,比如计算能力更强的CPU、10Gbps NIC、SSD硬盘或高性能存储。
  1. 覆盖所有节点的DNS名称,如果没有DNS服务器可以通过hosts文件实现。
  2. 所有的机器需要安装Java 8或更高版本


    pulsar基本配置

本文安装部署Pulsar集群的节点信息

节点 规格 部署组件 主机名/地址
Pulsar-zk-01 4vCPU,8G内存 ZooKeeper集群 kwe1000854790.novalocal/205.20.107.25
Pulsar-zk-02 4vCPU,8G内存 ZooKeeper集群 kwe1000853508.novalocal/205.20.107.21
Pulsar-zk-03 4vCPU,8G内存 ZooKeeper集群 kwe1000853507.novalocal/205.20.107.22
Pulsar-bk-01 8vCPU,16G内存 BookKeeper集群 kwe1000853505.novalocal/205.20.107.20
Pulsar-bk-02 8vCPU,16G内存 BookKeeper集群 kwe1000853504.novalocal/205.20.107.19
Pulsar-bk-03 8vCPU,16G内存 BookKeeper集群 kwe1000853503.novalocal/205.20.107.24

安装Pulsar

集群中的每个节点都需要安装Pulsar二进制包,包括ZooKeeper和BookKeeper节点。

  • 获取安装包
    Pulsar

  • 将软件包拷贝到/opt目录下解压,并将解压的目录重命名为pulsarCluster

    tar -zxvf apache-pulsar-2.1.1-incubating-bin.tar.gz
    mv apache-pulsar-2.1.1-incubating pulsarCluster· 
    

确保/opt目录有足够的磁盘空间,或使用其他目录安装

Pulsar的目录结构如下表:

目录 内容
bin Pulsar的命令行工具
conf Pulsar的配置文件
data 存储ZooKeeper和BookKeeper数据
lib Pulsar使用的第三方库
logs 日志存储路径
examples Pulsar提供的样例

安装Pulsar 连接器(可选)

从2.1.0-inclubating版本开始,Pulsar单独发布了包含所有内置连接器的二进制包,如果想使用这些内置的连接器,可以参考下面的步骤安装,如果不需要可以直接跳过。

  • 获取软件包
    Pulsar IO Connectors

  • 解压软件包,并将connector目录拷贝到Pulsar安装目录(/opt/puslarCluster)

    tar -zxvf apache-pulsar-io-connectors-2.1.1-incubating-bin.tar.gz
    cd apache-pulsar-io-connectors-2.1.1-incubating
    cp -r connectors/ /opt/pulsarCluster
    

部署ZooKeeper集群

  • 获取ZooKeeper安装包
    ZooKeeper

  • 将安装包拷贝到三个节点的/opt目录解压

    tar -zxvf zookeeper-3.4.12.tar.gz
    
  • 修改 conf/zookeeper.conf配置,增加ZooKeeper集群节点信息。

    server.1=205.20.107.25:2888:3888
    server.2=205.20.107.21:2888:3888
    server.3=205.20.107.22:2888:3888
    
    cd /opt/pulsarCluster
    echo “server.1=205.20.107.25:2888:3888” >> conf/zookeeper.conf
    echo “server.1=205.20.107.21:2888:3888” >> conf/zookeeper.conf
    echo “server.1=205.20.107.22:2888:3888” >> conf/zookeeper.conf
    
  • 配置ZooKeeper myid信息

    cd /opt/pulsarCluster
    mkdir -p data/zookeeper
    echo 1 > data/zookeeper/myid
    
  • 以守护进程启动ZooKeeper

    cd /opt/pulsarCluster/bin
    ./pulsar-daemon start zookeeper
    

配置集群信息

部署完ZooKeeper集群后,需要将一些Pulsar集群的元信息写入ZooKeeper集群的每个节点,由于数据在ZooKeeper集群内部会互相同步,因此只需要将元信息写入ZooKeeper的一个节点。可以在ZooKeeper集群的任意节点通过pulsar工具的 initialize-cluster-metadata方法配置数据,配置命令只需执行一次,否则ZooKeeper会报节点已经存在的错误。命令的一个简单样例如下:

$ ./pulsar initialize-cluster-metadata \
  --cluster pulsar-cluster-zk-1 \
  --zookeeper 205.20.107.25:2181 \
  --configuration-store 205.20.107.25:2181 \
  --web-service-url http://pulsar.cluster.com:8080 \
  --web-service-url-tls https://pulsar.cluster.com:8443 \
  --broker-service-url pulsar://pulsar.cluster.com:6650 \
  --broker-service-url-tls pulsar+ssl://pulsar.cluster.com:6651

在本文的安装部署过程中,Pulsar集群的名称为pulsar-cluster,统一域名pulsar.cluster.com。命令参数的具体含义如下:

Flag Description
--cluster 集群名称
--zookeeper ZooKeeper集群连接参数,仅需要包含集群中的一个节点即可
--configuration-store Pulsar实例的配置存储集群(ZooKeeper),和-zookeeper参数一样只需要包含集群中的一个节点即可
--web-service-url 集群Web服务的URL+端口,URL必须是一个i标准的DNS名称,默认端口8080,不建议修改。
--web-service-url-tls 集群Web提供TLS服务的URL+端口,端口默认8443,不建议修改。
--broker-service-url 集群brokers服务URL,URL中DNS的名称和Web服务保持一致,URL使用pulsar替代http/http,端口默认6650,不建议修改。
--broker-service-url-tls 集群brokers提供TLS服务的URL,默认端口6551,不建议修改。

部署BookKeeper集群

  • 配置BookKeeper集群
    Pulsar集群中所有持久数据的存储都由BookKeeper负责,因此如果想使用Pulsar需要部署一个BookKeeper集群,建议部署一个包含3个bookie节点的BookKeeper集群。BookKeeper集群的配置使用conf/bookkeeper.conf文件,BookKeeper最终的配置是配置ZooKeeper集群的地址,一个具体的配置例子如下:
    zkServers=205.20.107.25:2181,205.20.107.21:2181,205.20.107.22:2181
    
    Pulsar从2.1.0版本开始引入了有状态函数,如果想使用该功能,还需要在conf/bookkeeper.conf文件中增加如下配置:
    extraServerComponents=org.apache.bookkeeper.stream.server.StreamStorageLifecycleComponent
    
  • 启动BooKeeper集群
    • 后台进程启动
      cd /opt/pulsarCluster/bin
      ./pulsar-daemon start bookie
      
    • 前台进程启动
      cd /opt/pulsarCluster/bin
      ./bookkeeper bookie
      
  • 检查BookKeeper集群状态
     cd /opt/pulsarCluster/bin
     ./bookkeeper shell bookiesanity
    
    如果BookKeeper成功运行,输出的最后一行结果为:
    org.apache.bookkeeper.bookie.BookieShell - Bookie sanity test succeeded
    

部署Pulsar brokers

  • 配置pulsar brokers:在conf/broker.conf中配置ZooKeeper集群、配置存储集群、集群名称、开始函数功能
     zookeeperServers=205.20.107.25:2181,205.20.107.21:2181,205.20.107.22:2181
     configurationStoreServers=205.20.107.25:2181,205.20.107.21:2181,205.20.107.22:2181
     clusterName=pulsar-cluster
     functionsWorkerEnabled=true
    
  • 配置function worker 集群名(conf/functions_worker.yml)
    ulsarFunctionsCluster=pulsar-cluster
    
  • 启动 brokers
    • 后台进程启动
      $ cd /opt/pulsarCluster/bin
      $ ./pulsar-daemon start broker
      
    • 前台进程启动
      $ cd /opt/pulsarCluster/bin
      $ ./bookkeeper broker
      

使用Pulsar发布、订阅消息(Java)

安装Pulsar Java客户端

如果使用Maven,在工程的pom文件增加如下配置:

<!-- 在<properties> 块中增加版本号信息 -->
<pulsar.version>2.1.1-incubating</pulsar.version>

<!--增加依赖 -->
<dependency>
  <groupId>org.apache.pulsar</groupId>
  <artifactId>pulsar-client</artifactId>
  <version>${pulsar.version}</version>
</dependency>

构造Client

  • Pular连接 URLs
    • 本地Pulsar
      pulsar://localhost:6650
      
    • 生成环境Pulsar集群(使用域名)
      pulsar://pulsar.cluster.com:6650
      
    • 开启TLS的条件下,Pulsar集群连接URLs
      pulsar+ssl://pulsar.cluster.com:6651
      
  • 配置Pulsar客户端
    客户端的配置主要包括:Pulsar集群信息配置、鉴权信息配置、TLS配置、线程数连接数配置等。具体的配置可以参考Pulsar Client配置
    基于本文搭建的集群,只保留最简单的配置信息,Pulsar Client的构造如下:
          private static final String SERVER_URL = "pulsar://pulsar.cluster.com:6650";
          PulsarClient client = PulsarClient.builder()
              .serviceUrl(SERVER_URL)
              .enableTcpNoDelay(true)
              .build();
    

构造生产者

一个完整的生产者样例如下:


import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

public class ProducerDemm {
    private static final Logger log = LoggerFactory.getLogger(ProducerDemm.class);
    private static final String SERVER_URL = "pulsar://pulsar.cluster.com:6650";
    public static void main(String[] args) throws Exception {
        // 构造Pulsar Client
        PulsarClient client = PulsarClient.builder()
            .serviceUrl(SERVER_URL)
            .enableTcpNoDelay(true)
            .build();
        // 构造生产者
        Producer<String> producer = client.newProducer(Schema.STRING)
            .producerName("my-producer")
            .topic("persistent://public/default/my-topic")
            .batchingMaxMessages(1024)
            .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
            .enableBatching(true)
            .blockIfQueueFull(true)
            .maxPendingMessages(512)
            .sendTimeout(10, TimeUnit.SECONDS)
            .blockIfQueueFull(true)
            .create();
        // 同步发送消息
        MessageId messageId = producer.send("Hello World");
        log.info("message id is {}",messageId);
        CompletableFuture<MessageId> asyncMessageId = producer.sendAsync("This is a async message");
        // 阻塞线程,直到返回结果
        log.info("async message id is {}",asyncMessageId.get());

        // 配置发送的消息元信息,同步发送
        producer.newMessage()
            .key("my-message-key")
            .value("my-message")
            .property("my-key", "my-value")
            .property("my-other-key", "my-other-value")
            .send();
        producer.newMessage()
            .key("my-async-message-key")
            .value("my-async-message")
            .property("my-async-key", "my-async-value")
            .property("my-async-other-key", "my-async-other-value")
            .sendAsync();
        
        // 关闭producer的方式有两种:同步和异步
        // producer.closeAsync();
        producer.close();
        
        // 关闭licent的方式有两种,同步和异步
        // client.close();
        client.closeAsync();
        
    }
}

构造消费者

  • 单订阅

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

public class MyConsumer {
   private static final Logger log = LoggerFactory.getLogger(MyConsumer.class);
   private static final String SERVER_URL = "pulsar://pulsar.cluster.com:6650";

   public static void main(String[] args) throws Exception {
       // 构造Pulsar Client
       PulsarClient client = PulsarClient.builder()
           .serviceUrl(SERVER_URL)
           .enableTcpNoDelay(true)
           .build();
       Consumer consumer = client.newConsumer()
           .consumerName("my-consumer")
           .topic("persistent://public/default/my-topic")
           .subscriptionName("my-subscription")
           .ackTimeout(10, TimeUnit.SECONDS)
           .maxTotalReceiverQueueSizeAcrossPartitions(10)
           .subscriptionType(SubscriptionType.Exclusive)
           .subscribe();
       do {
           // 接收消息有两种方式:异步和同步
           // CompletableFuture<Message<String>> message = consumer.receiveAsync();
           Message message = consumer.receive();
           log.info("get message from pulsar cluster,{}", message);
       } while (true);
   }
}
  • 多订阅

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;

public class MultiConsumer {
    private static final Logger log = LoggerFactory.getLogger(MultiConsumer.class);
    private static final String SERVER_URL = "pulsar://pulsar.cluster.com:6650";
    private static final String DEFAULT_NS_TOPICS = "persistent://public/default/.*";
    private static final String DEFATULT_NS_REG_TOPICS= "persistent://public/default/my.*";
    private static void main(String[] args) throws Exception {
        PulsarClient client = PulsarClient.builder()
            .serviceUrl(SERVER_URL)
            .enableTcpNoDelay(true)
            .build();
        ConsumerBuilder consumerBuilder = client.newConsumer()
            .subscriptionName("multi-sub");

        // 订阅namespace下所有的topic
        Pattern allTopicsInNamespace = Pattern.compile(DEFAULT_NS_TOPICS);
        consumerBuilder.topicsPattern("").subscribe();

        // 订阅namespace下满足正则匹配的topic
        Pattern someTopicsInNamespace = Pattern.compile(DEFATULT_NS_REG_TOPICS);
        Consumer allTopicsConsumer = consumerBuilder
            .topicsPattern(someTopicsInNamespace)
            .subscribe();

        List<String> topics = Arrays.asList(
            "topic-1",
            "topic-2",
            "topic-3"
        );

        Consumer multiTopicConsumer = consumerBuilder
            .topics(topics)
            .subscribe();
        
    }
}

Pulsar美中不足

  • 不支持Windows平台安装部署,这对多数使用Windows的开发者来说调试成本较高。
  • SDK支持不足,Go,C++版本的SDK对Windows平台的支持还不够完美。
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,723评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,080评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,604评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,440评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,431评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,499评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,893评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,541评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,751评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,547评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,619评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,320评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,890评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,896评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,137评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,796评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,335评论 2 342

推荐阅读更多精彩内容