未分类

kafka

简单使用

  1. 创建一个topic
    kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
  2. 创建一个消费者
    kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
  3. 创建一个消息生产者
    kafka-console-producer.sh --broker-list localhost:9092 --topic test
  4. 查看消费组
    kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list
  5. 查看消费情况
    kafka-consumer-groups.sh --describe --bootstrap-server 127.0.0.1:9092 --group 5f575fb319a4d7.98342076

为什么需要中间件

想象一个场景,你的一个创建订单的操作,在订单创建完成之后,需要触发一系列其他的操作,比如进行用户订单数据的统计、给用户发送短信、给用户发送邮件等等

createOrder(...){
 ...
 statOrderData(...);
 sendSMS();
 sendEmail();
}

Q&A

Q: Fatal error during KafkaServer startup. Prepare to shutdown

[2021-07-30 03:57:28,720] ERROR Error while creating ephemeral at /brokers/ids/1, node already exists and owner '72057740085755904' does not match current session '72108709488295936' (kafka.zk.KafkaZkClient$CheckedEphemeral)
[2021-07-30 03:57:28,734] ERROR [KafkaServer id=1] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists
    at org.apache.zookeeper.KeeperException.create(KeeperException.java:126)
    at kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:1837)
    at kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1775)
    at kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1742)
    at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:95)
    at kafka.server.KafkaServer.startup(KafkaServer.scala:312)
    at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
    at kafka.Kafka$.main(Kafka.scala:82)
    at kafka.Kafka.main(Kafka.scala)

Q:无法消费(更换group-id名字就可以了?)

[2021-08-03 06:41:21,468] INFO [GroupCoordinator 1]: Dynamic Member with unknown member id joins group group-a in Stable state. Created a new member id server@jianwei.lan (github.com/segmentio/kafka-go)-327d2a28-455f-4aa5-bd9e-db94d9e72e2c for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2021-08-03 06:41:21,468] INFO [GroupCoordinator 1]: Preparing to rebalance group group-a in state PreparingRebalance with old generation 64 (__consumer_offsets-47) (reason: Adding new member server@jianwei.lan (github.com/segmentio/kafka-go)-327d2a28-455f-4aa5-bd9e-db94d9e72e2c with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2021-08-03 06:41:47,231] INFO [GroupCoordinator 1]: Member server@jianwei.lan (github.com/segmentio/kafka-go)-874e4ff5-4842-4730-b4a6-e581dbd43567 in group group-a has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2021-08-03 06:41:47,233] INFO [GroupCoordinator 1]: Stabilized group group-a generation 65 (__consumer_offsets-47) (kafka.coordinator.group.GroupCoordinator)
[2021-08-03 06:41:47,270] INFO [GroupCoordinator 1]: Assignment received from leader for group group-a for generation 65 (kafka.coordinator.group.GroupCoordinator)

官方资料

中文手册
配置说明
命令操作
PHP客户端

相关资料

kafka docker-compose
es-docker-compose
kafka入门使用

Kafka简明教程

工具

可视化UI-kafka

可视化UI-kafka-基本使用

发表评论