php 使用kafka

2021-01-07 14:28

阅读:1022

标签:alt   bootstra   校验   oob   生效   property   -o   keyword   blank   

准备工作

安装librdkafka 库

git clone https://github.com/edenhill/librdkafka.git
 ./configure
 make
 sudo make install

安装php-rdkafka 扩展

$ git clone https://github.com/arnaud-lb/php-rdkafka.git
 
#生成configure文件
$ /Users/shiyibo/LNMP/php/bin/phpize 
 
#编译安装
$ ./configure --with-php-config=/Users/shiyibo/LNMP/php/bin/php-config
$ make
$ make install 
 
#在php.ini 文件中配置 rdkafka扩展
$ vim /Users/shiyibo/LNMP/php/etc/php.ini
extension=rdkafka.so
 
#查看扩展是否生效
$php -m | grep kafka

编码实现

生产者

  • 代码实现
/**
 * Created by PhpStorm.
 * User: shiyibo
 * Date: 2019/2/24
 * Time: 12:57 PM
 */

/**
 * 消息生产者
 *
 * 实现的例子来源于:
 *
 * https://github.com/arnaud-lb/php-rdkafka#examples
 */

$objRdKafka = new RdKafka\Producer();
$objRdKafka->setLogLevel(LOG_DEBUG);
$objRdKafka->addBrokers("localhost:9092");

$oObjTopic = $objRdKafka->newTopic("test");

// 从终端接收输入 
$oInputHandler = fopen(‘php://stdin‘, ‘r‘);

while (true) {
    echo "\nEnter  messages:\n";
    $sMsg = trim(fgets($oInputHandler));

   // 空消息意味着退出
    if (empty($sMsg)) {
        break;
    }

    // 发送消息
    $oObjTopic->produce(RD_KAFKA_PARTITION_UA, 0, $sMsg);
}

echo "done\n";
  • 检验发送是否成功

终端开启一个消费者:

# 因为生产者会往test的topic中发送消息,消费者直接消费test即可
kafka-console-consumer --bootstrap-server localhost:9092 --topic test

生产者端发送:
技术图片


消费者端接收:

技术图片

消费者

  • 代码实现
/**
 * 消费者消费消息
 *
 * 实现的例子来源于:
 *
 * https://github.com/arnaud-lb/php-rdkafka#examples
 */

$objRdKafka = new RdKafka\Consumer();
$objRdKafka->setLogLevel(LOG_DEBUG);
$objRdKafka->addBrokers("localhost:9092");

$oObjTopic = $objRdKafka->newTopic("test");

/**
 * consumeStart
 *   第一个参数标识分区,生产者是往分区0发送的消息,这里也从分区0拉取消息
 *   第二个参数标识从什么位置开始拉取消息,可选值为
 *     RD_KAFKA_OFFSET_BEGINNING : 从开始拉取消息
 *     RD_KAFKA_OFFSET_END : 从当前位置开始拉取消息
 *     RD_KAFKA_OFFSET_STORED : 猜测跟RD_KAFKA_OFFSET_END一样
 */
$oObjTopic->consumeStart(0, RD_KAFKA_OFFSET_END);

while (true) {
    // 第一个参数是分区,第二个参数是超时时间
    $oMsg = $oObjTopic->consume(0, 1000);

    // 没拉取到消息时,返回NULL
    if (!$oMsg) {
        usleep(10000);
        continue;
    }

    if ($oMsg->err) {
        echo $msg->errstr(), "\n";
        break;
    } else {
        echo $oMsg->payload, "\n";
    }
}

  • 校验
    生产者端发送:

    技术图片
     

消费者端接收:

 
技术图片
 



php 使用kafka

标签:alt   bootstra   校验   oob   生效   property   -o   keyword   blank   

原文地址:https://www.cnblogs.com/myJuly/p/13564242.html


评论


亲,登录后才可以留言!