简单使用
- 创建一个topic
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
- 创建一个消费者
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
- 创建一个消息生产者
kafka-console-producer.sh --broker-list localhost:9092 --topic test
- 查看消费组
kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list
- 查看消费情况
kafka-consumer-groups.sh --describe --bootstrap-server 127.0.0.1:9092 --group 5f575fb319a4d7.98342076
为什么需要中间件
想象一个场景,你的一个创建订单的操作,在订单创建完成之后,需要触发一系列其他的操作,比如进行用户订单数据的统计、给用户发送短信、给用户发送邮件等等
createOrder(...){
...
statOrderData(...);
sendSMS();
sendEmail();
}
Q&A
Q: Fatal error during KafkaServer startup. Prepare to shutdown
[2021-07-30 03:57:28,720] ERROR Error while creating ephemeral at /brokers/ids/1, node already exists and owner '72057740085755904' does not match current session '72108709488295936' (kafka.zk.KafkaZkClient$CheckedEphemeral)
[2021-07-30 03:57:28,734] ERROR [KafkaServer id=1] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists
at org.apache.zookeeper.KeeperException.create(KeeperException.java:126)
at kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:1837)
at kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1775)
at kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1742)
at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:95)
at kafka.server.KafkaServer.startup(KafkaServer.scala:312)
at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
at kafka.Kafka$.main(Kafka.scala:82)
at kafka.Kafka.main(Kafka.scala)
Q:无法消费(更换group-id名字就可以了?)
[2021-08-03 06:41:21,468] INFO [GroupCoordinator 1]: Dynamic Member with unknown member id joins group group-a in Stable state. Created a new member id server@jianwei.lan (github.com/segmentio/kafka-go)-327d2a28-455f-4aa5-bd9e-db94d9e72e2c for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2021-08-03 06:41:21,468] INFO [GroupCoordinator 1]: Preparing to rebalance group group-a in state PreparingRebalance with old generation 64 (__consumer_offsets-47) (reason: Adding new member server@jianwei.lan (github.com/segmentio/kafka-go)-327d2a28-455f-4aa5-bd9e-db94d9e72e2c with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2021-08-03 06:41:47,231] INFO [GroupCoordinator 1]: Member server@jianwei.lan (github.com/segmentio/kafka-go)-874e4ff5-4842-4730-b4a6-e581dbd43567 in group group-a has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2021-08-03 06:41:47,233] INFO [GroupCoordinator 1]: Stabilized group group-a generation 65 (__consumer_offsets-47) (kafka.coordinator.group.GroupCoordinator)
[2021-08-03 06:41:47,270] INFO [GroupCoordinator 1]: Assignment received from leader for group group-a for generation 65 (kafka.coordinator.group.GroupCoordinator)
官方资料
相关资料
kafka docker-compose
es-docker-compose
kafka入门使用