1. Home
  2. Docs
  3. kratos
  4. 框架组件
  5. 消息队列-kafka

消息队列-kafka

interanl/queue/queue.go

    config := sarama.NewConfig()

    config.Producer.Return.Successes = true
    config.Producer.Partitioner = sarama.NewRandomPartitioner

    //client,err := sarama.NewClient([]string{"192.168.2.27:9092"}, config)
    //
    //defer client.Close()

    //if err != nil {
    //  panic(err)
    //}

    producer, err := sarama.NewAsyncProducer([]string{"192.168.2.27:9092"}, config)

    if err != nil {
        panic(err)
    }

    queue := &Queue{
        producer: producer,
    }

    var (
        wg        sync.WaitGroup
        successes, errors int
    )

    wg.Add(1)
    // start a groutines to count successes num
    go func() {
        defer wg.Done()
        for range producer.Successes() {
            successes++
        }
    }()

    wg.Add(1)
    // start a groutines to count error num
    go func() {
        defer wg.Done()
        for err := range producer.Errors() {
            log.Println(err)
            errors++
        }
    }()

interanl/queue/order.go

...
    o.log.Info("queue info, %d", o.queue.producer)

    topic := "test"
    value := "are you ok"

    data, err := json.Marshal(value)
    if err != nil {
        fmt.Printf("[kafka_producer][sendMessage]:%s", err.Error())
        //g.Log.Error("[kafka_producer][sendMessage]:%s", err.Error())
        return nil
    }

    msg := &sarama.ProducerMessage{
        Topic: topic,
        Value: sarama.ByteEncoder(data),
    }

    o.queue.producer.Input() <- msg
...

相关资料

Go-kafka-sdk Shopify/sarama

sarama/docker-compose.yml

kafka docker-compose go

go实现kafka推送


Running Kafka in Docker Machine
easy-swoole/kafka docker-compose

Was this article helpful to you? Yes No 1

How can we help?