是什么
简单来说,Kafka是一个消息中间件。生产者产生消息,消费者消费消息。
适用场景

  1. 系统解耦

实战

在Ubuntu(Debian)下安装Apache Kafka。
Apache Kafka 依赖Java环境,所以需要先安装Java。

1
2
sudo apt update
sudo apt install default-jdk

从官网下载Kafka压缩包,开箱即用。

1
2
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz
tar xzf kafka_2.12-2.3.0.tgz

Kafka使用ZooKeeper,先启动ZooKeeper服务,然后再启动Kafka服务。

1
2
kafka/bin/zookeeper-server-start.sh kafka/config/zookeeper.properties
kafka/bin/kafka-server-start.sh kafka/config/server.properties

Php Kafka Client

1
sudo apt-get install librdkafka-dev

Rdkafka文档
Producer example

1
2
3
4
5
6
7
8
9
10
11
<?php
$rk = new RdKafka\Producer();
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("127.0.0.1:9092");

$topic = $rk->newTopic("test");
while(1) {
$message = date('Y-m-d H:i:s') . ' message ';
$topic->produce(RD_KAFKA_PARTITION_UA, 0, $message);
sleep(1);
}

Consumer example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<?php
$rk = new RdKafka\Consumer();
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("127.0.0.1");

$topic = $rk->newTopic("test");
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);

while (true) {
// The first argument is the partition (again).
// The second argument is the timeout.
$msg = $topic->consume(0, 1000);
if (null === $msg) {
continue;
} elseif ($msg->err) {
echo $msg->errstr(), "\n";
continue;
} else {
echo $msg->payload, "\n";
}
}