php 使用 Kafka

发布于 2022-03-08 11:29 阅读 789

kafka没有重试机制不支持消息重试,也没有死信队列,因此使用kafka做消息队列时,如果遇到了消息在业务处理时出现异常,就会很难进行下一步处理。应对这种场景,需要自己实现消息重试的功能。

自己实现重试机制

kafka.png

cli

#停止
kafka-server-stop.sh
#前台启动
kafka-server-start.sh config/server.properties
#守护进程
kafka-server-start.sh -daemon config/server.properties
#新建名为test2的topic,包含2个分区,1个副本
kafka-topics.sh --bootstrap-server kafka:9092 --create --partitions 2 --replication-factor 1 --topic test2
#查看topic列表
kafka-topics.sh --bootstrap-server kafka:9092 --list
#查看topic详情
kafka-topics.sh --bootstrap-server kafka:9092 --describe --topic test2

#删除topic
kafka-topics.sh --bootstrap-server kafka:9092 --delete --topic test2

#消费组
kafka-consumer-groups.sh --bootstrap-server kafka:9092 --list
#消费组详情
kafka-consumer-groups.sh --bootstrap-server kafka:9092 --describe --group test

#命令行生产者
kafka-console-producer.sh --broker-list kafka:9092 --topic test
a
b
c
CTRL + C 结束

#命令行消费者
kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic test --from-beginning

High-level consumer 和 Low-level consumer

从开放的功能服务看,highlevel的配置与使用相对简单,分布式管理功能由Kafka集群与zookeeper自行管理,消费者只要从头或者从最新处获取数据即可,服务重启,能从上次消费位置开始,leader故障,能够自行rebalance。而lowlevel更为为灵活也更复杂,这些都是开放给自己的应用来管理。

https://www.cnblogs.com/liliuguang/p/14627513.html

php扩展

# Dockerfile
RUN apt-get update && apt-get install -y librdkafka-dev
RUN pecl install rdkafka-6.0.0 \
    && docker-php-ext-enable rdkafka

kafka服务端,及web管理工具

https://github.com/chudaozhe/docker-kafka

https://blog.csdn.net/sz85850597/article/details/86749783

https://www.jianshu.com/p/9b83f612c300

广而告之,我的新作品《语音助手》上架Google Play了,欢迎下载体验