php使用kafka

Ⅰ、kafka扩展安装参考

Ⅱ、代码
1、生产者

<?php
class KafkaProducer
{
    const TAG = 'KafkaProducer';

    /** @var RdKafka\ProducerTopic */
    private static $topic;

    /** @var  RdKafka\Producer */
    private static $producer;

    /**
     * @param string $brokerList
     * @param string $topic
     * @param array $isSsl 
     * @return KafkaProducer
     * @throws Exception
     */
    public static function init($brokerList, $topic, $isSsl = null)
    {
        if (!extension_loaded('rdkafka')) {
            throw new Exception("no rdkafka extension", 1);
        }

        $conf = new RdKafka\Conf();

        $conf->set('metadata.broker.list', $brokerList);
        $conf->set('broker.version.fallback', '1.0.0');
        $conf->set('compression.type', 'lz4');
        $conf->set("retries", 3);
        $conf->set("retry.backoff.ms", 1000);

        $conf->setDrMsgCb(function ($kafka, $message) {
//            static::log('推送成功:' . var_export($message, true));
        });
        $conf->setErrorCb(function ($kafka, $err, $reason) {
            throw new Exception('推送失败:' . rd_kafka_err2str($err) . '。缘由:' . $reason);
        });

        if ($isSsl) {
            $conf->set('security.protocol', KAFKA_SECURITY_PROTOCOL);
            $conf->set('sasl.mechanism', KAFKA_SASL_MECHANISM);
            $conf->set('sasl.username', $isSsl['username']);
            $conf->set('sasl.password', $isSsl['password']);
        }

        static::$producer = new RdKafka\Producer($conf);
        static::$topic = static::$producer->newTopic($topic);

        return new KafkaProducer();
    }

    /**
     * @param $message
     * @param int $retries
     * @throws Exception
     */
    public function producer($message, $retries = 10)
    {
        try {
            static::$topic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode($message));

            static::$producer->poll(0);

            // 线上版本不支持flush方法
            if (!method_exists(static::$producer, 'flush')) {
                return;
            }

            $result = RD_KAFKA_RESP_ERR_NO_ERROR;

            for ($flushRetries = 0; $flushRetries < $retries; $flushRetries++) {
                $result = static::$producer->flush(10000);
                if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
                    break;
                }
            }

            if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
                throw new \RuntimeException('Was unable to flush, messages might be lost!');
            }
        } catch (Exception $e) {
            $this->log(array('msg' => json_encode($message), 'err_msg' => $e->getMessage()));
            throw $e;
        }
    }

    public function log($message)
    {
        $message = PHP_EOL . "时间:" . date('Y-m-d H:i:s') . PHP_EOL . var_export($message, true) . PHP_EOL;
        error_log($message, 3, ROOT_DIR . '/log/KafkaError.log');
    }
}

2、消费者

<?php
<?php
require_once dirname(__FILE__) . '/../constant.php';

class KafkaConsumer
{
    /** @var RdKafka\ConsumerTopic */
    private static $topic;

    /** @var  RdKafka\KafkaConsumer */
    private static $consumer;

    /**
     * @param string $brokerList
     * @param string $groupId
     * @param string $topic
     * @param array $isSsl 公司集群每个topic都得申请认证用户和密码
     * @return KafkaConsumer
     * @throws Exception
     */
    public static function init($brokerList, $groupId, $topic, $isSsl = null)
    {
        if (!extension_loaded('rdkafka')) {
            throw new Exception("no rdkafka extension", 1);
        }
        $conf = new \RdKafka\Conf();
        $conf->setRebalanceCb(function (\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
            switch ($err) {
                case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
                    $kafka->assign($partitions);
                    break;

                case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
                    $kafka->assign(NULL);
                    break;
                default:
                    throw new \Exception($err);
            }
        });
        if ($isSsl) {
            $conf->set('security.protocol', KAFKA_SECURITY_PROTOCOL);
            $conf->set('sasl.mechanism', KAFKA_SASL_MECHANISM);
            $conf->set('sasl.username', $isSsl['username']);
            $conf->set('sasl.password', $isSsl['password']);
        }
        $conf->setErrorCb(function ($kafka, $err, $reason) {
            throw new Exception('消费失败:' . rd_kafka_err2str($err) . '。缘由:' . $reason);
        });
        $conf->set('group.id', $groupId);
        $conf->set('metadata.broker.list', $brokerList);
        $conf->set('topic.metadata.refresh.interval.ms', 60000);    // Topic metadata 刷新间隔,毫秒。metadata 自动刷新错误和连接。设置为 -1 关闭刷新间隔。
        $conf->set('socket.keepalive.enable', true);          // Broker sockets 允许 TCP 保持活力

        $conf->set('auto.commit.interval.ms', 100);
        $conf->set('auto.offset.reset', 'latest');//smallest

        static::$consumer = new \RdKafka\KafkaConsumer($conf);
        static::$consumer->subscribe([$topic]);

        return new KafkaConsumer();
    }

    /**
     * @param callable|null $callback
     * @throws Exception
     */
    public static function consumer($callback = null)
    {
        try {
            $message = static::$consumer->consume(120 * 1000);
            switch ($message->err) {
                case RD_KAFKA_RESP_ERR_NO_ERROR:
                    if ($callback)
                        call_user_func($callback, $message);
                    break;
                case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                    throw new \Exception("No more messages; will wait for more\n");
                    break;
                case RD_KAFKA_RESP_ERR__TIMED_OUT:
                    throw new \Exception("Timed out\n");
                    break;
                default:
                    throw new \Exception($message->errstr(), $message->err);
                    break;
            }
        } catch (Exception $e) {
            $message = PHP_EOL . "时间:" . date('Y-m-d H:i:s') . PHP_EOL . var_export($e->getMessage(), true) . PHP_EOL;
            error_log($message, 3, ROOT_DIR . '/log/KafkaError.log');
            throw $e;
        }
    }
}

3、生产者调用

try {
    $brokerList = '127.0.0.1:9092';
    $producer = KafkaProducer::init($brokerList, 'test_cluster',
        array('username' => 'writer', 'password' => '123456'));
    for ($i = 0; $i < 10; $i++) {
        $producer->producer(array('bc' => 201700, 'numd' => $i), 2);
    }
} catch (Exception $e) {
    var_dump($e->getMessage());
}

4、消费者调用

try {
    $brokerList = '127.0.0.1:9092';
    $kc = KafkaConsumer::init(
        $brokerList,
        'test-consumer-group',
        'test_cluster',
        array('username' => 'reader', 'password' => '123456')
    );
    while (true) {
        $kc->consumer(function ($message) {
            var_dump($message);
        });
    }
} catch (Exception $e) {
    var_dump($e->getMessage());
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,271评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,275评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,151评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,550评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,553评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,559评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,924评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,580评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,826评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,578评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,661评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,363评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,940评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,926评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,156评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,872评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,391评论 2 342

推荐阅读更多精彩内容