diff --git a/producer.go b/producer.go index e8f2eea..795736b 100644 --- a/producer.go +++ b/producer.go @@ -115,17 +115,15 @@ func (p *Producer) Async(data ProducerData) ProducerResult { result := ProducerResult{} p.asyncProducer.Input() <- p.buildMessage(data) // 如果打开了Return.Successes配置,则等同于同步方式 + t := time.NewTicker(time.Second) select { case msg := <-p.asyncProducer.Successes(): result.Partition = msg.Partition result.Offset = msg.Offset case err := <-p.asyncProducer.Errors(): result.Err = err - default: - // 为什么要有default ? - // 则这段代码会挂住,因为设置没有要求返回成功config.Producer.Return.Successes = false, - // 那么在select等待的时候producer.Successes()不会返回,producer.Errors()也不会返回(假设没有错误发生),就挂在这儿。 - // 当然可以加一个default分支绕过去,就不会挂住了: + case <-t.C: + result.Err = errors.New("produce data wait result timeout") } return result }