Ⅰ、kafka扩展安装参考
Ⅱ、代码
1、生产者
<?php
class KafkaProducer
{
const TAG = 'KafkaProducer';
/** @var RdKafka\ProducerTopic */
private static $topic;
/** @var RdKafka\Producer */
private static $producer;
/**
* @param string $brokerList
* @param string $topic
* @param array $isSsl
* @return KafkaProducer
* @throws Exception
*/
public static function init($brokerList, $topic, $isSsl = null)
{
if (!extension_loaded('rdkafka')) {
throw new Exception("no rdkafka extension", 1);
}
$conf = new RdKafka\Conf();
$conf->set('metadata.broker.list', $brokerList);
$conf->set('broker.version.fallback', '1.0.0');
$conf->set('compression.type', 'lz4');
$conf->set("retries", 3);
$conf->set("retry.backoff.ms", 1000);
$conf->setDrMsgCb(function ($kafka, $message) {
// static::log('推送成功:' . var_export($message, true));
});
$conf->setErrorCb(function ($kafka, $err, $reason) {
throw new Exception('推送失败:' . rd_kafka_err2str($err) . '。缘由:' . $reason);
});
if ($isSsl) {
$conf->set('security.protocol', KAFKA_SECURITY_PROTOCOL);
$conf->set('sasl.mechanism', KAFKA_SASL_MECHANISM);
$conf->set('sasl.username', $isSsl['username']);
$conf->set('sasl.password', $isSsl['password']);
}
static::$producer = new RdKafka\Producer($conf);
static::$topic = static::$producer->newTopic($topic);
return new KafkaProducer();
}
/**
* @param $message
* @param int $retries
* @throws Exception
*/
public function producer($message, $retries = 10)
{
try {
static::$topic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode($message));
static::$producer->poll(0);
// 线上版本不支持flush方法
if (!method_exists(static::$producer, 'flush')) {
return;
}
$result = RD_KAFKA_RESP_ERR_NO_ERROR;
for ($flushRetries = 0; $flushRetries < $retries; $flushRetries++) {
$result = static::$producer->flush(10000);
if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
break;
}
}
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
throw new \RuntimeException('Was unable to flush, messages might be lost!');
}
} catch (Exception $e) {
$this->log(array('msg' => json_encode($message), 'err_msg' => $e->getMessage()));
throw $e;
}
}
public function log($message)
{
$message = PHP_EOL . "时间:" . date('Y-m-d H:i:s') . PHP_EOL . var_export($message, true) . PHP_EOL;
error_log($message, 3, ROOT_DIR . '/log/KafkaError.log');
}
}
2、消费者
<?php
<?php
require_once dirname(__FILE__) . '/../constant.php';
class KafkaConsumer
{
/** @var RdKafka\ConsumerTopic */
private static $topic;
/** @var RdKafka\KafkaConsumer */
private static $consumer;
/**
* @param string $brokerList
* @param string $groupId
* @param string $topic
* @param array $isSsl 公司集群每个topic都得申请认证用户和密码
* @return KafkaConsumer
* @throws Exception
*/
public static function init($brokerList, $groupId, $topic, $isSsl = null)
{
if (!extension_loaded('rdkafka')) {
throw new Exception("no rdkafka extension", 1);
}
$conf = new \RdKafka\Conf();
$conf->setRebalanceCb(function (\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
switch ($err) {
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
$kafka->assign($partitions);
break;
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
$kafka->assign(NULL);
break;
default:
throw new \Exception($err);
}
});
if ($isSsl) {
$conf->set('security.protocol', KAFKA_SECURITY_PROTOCOL);
$conf->set('sasl.mechanism', KAFKA_SASL_MECHANISM);
$conf->set('sasl.username', $isSsl['username']);
$conf->set('sasl.password', $isSsl['password']);
}
$conf->setErrorCb(function ($kafka, $err, $reason) {
throw new Exception('消费失败:' . rd_kafka_err2str($err) . '。缘由:' . $reason);
});
$conf->set('group.id', $groupId);
$conf->set('metadata.broker.list', $brokerList);
$conf->set('topic.metadata.refresh.interval.ms', 60000); // Topic metadata 刷新间隔,毫秒。metadata 自动刷新错误和连接。设置为 -1 关闭刷新间隔。
$conf->set('socket.keepalive.enable', true); // Broker sockets 允许 TCP 保持活力
$conf->set('auto.commit.interval.ms', 100);
$conf->set('auto.offset.reset', 'latest');//smallest
static::$consumer = new \RdKafka\KafkaConsumer($conf);
static::$consumer->subscribe([$topic]);
return new KafkaConsumer();
}
/**
* @param callable|null $callback
* @throws Exception
*/
public static function consumer($callback = null)
{
try {
$message = static::$consumer->consume(120 * 1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
if ($callback)
call_user_func($callback, $message);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
throw new \Exception("No more messages; will wait for more\n");
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
throw new \Exception("Timed out\n");
break;
default:
throw new \Exception($message->errstr(), $message->err);
break;
}
} catch (Exception $e) {
$message = PHP_EOL . "时间:" . date('Y-m-d H:i:s') . PHP_EOL . var_export($e->getMessage(), true) . PHP_EOL;
error_log($message, 3, ROOT_DIR . '/log/KafkaError.log');
throw $e;
}
}
}
3、生产者调用
try {
$brokerList = '127.0.0.1:9092';
$producer = KafkaProducer::init($brokerList, 'test_cluster',
array('username' => 'writer', 'password' => '123456'));
for ($i = 0; $i < 10; $i++) {
$producer->producer(array('bc' => 201700, 'numd' => $i), 2);
}
} catch (Exception $e) {
var_dump($e->getMessage());
}
4、消费者调用
try {
$brokerList = '127.0.0.1:9092';
$kc = KafkaConsumer::init(
$brokerList,
'test-consumer-group',
'test_cluster',
array('username' => 'reader', 'password' => '123456')
);
while (true) {
$kc->consumer(function ($message) {
var_dump($message);
});
}
} catch (Exception $e) {
var_dump($e->getMessage());
}