interanl/queue/queue.go
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.Partitioner = sarama.NewRandomPartitioner
//client,err := sarama.NewClient([]string{"192.168.2.27:9092"}, config)
//
//defer client.Close()
//if err != nil {
// panic(err)
//}
producer, err := sarama.NewAsyncProducer([]string{"192.168.2.27:9092"}, config)
if err != nil {
panic(err)
}
queue := &Queue{
producer: producer,
}
var (
wg sync.WaitGroup
successes, errors int
)
wg.Add(1)
// start a groutines to count successes num
go func() {
defer wg.Done()
for range producer.Successes() {
successes++
}
}()
wg.Add(1)
// start a groutines to count error num
go func() {
defer wg.Done()
for err := range producer.Errors() {
log.Println(err)
errors++
}
}()
interanl/queue/order.go
...
o.log.Info("queue info, %d", o.queue.producer)
topic := "test"
value := "are you ok"
data, err := json.Marshal(value)
if err != nil {
fmt.Printf("[kafka_producer][sendMessage]:%s", err.Error())
//g.Log.Error("[kafka_producer][sendMessage]:%s", err.Error())
return nil
}
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(data),
}
o.queue.producer.Input() <- msg
...
相关资料
Running Kafka in Docker Machine
easy-swoole/kafka docker-compose