// Package message...
//
// Description : 基于kafka实现消息队列
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2021-09-23 11:09 下午
package message

import (
	"git.zhangdeman.cn/zhangdeman/gopkg/middleware/kafka"
	"github.com/pkg/errors"
)

// NewMessageQueueWithKafka 获取基于kafka消息队列的实例
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 11:11 下午 2021/9/23
func NewMessageQueueWithKafka(producer kafka.Producer, consumer kafka.Consumer, sync bool, dataHandler IKafkaMessageHandler) IMessageQueue {
	return &QueueWithKafka{
		producer:    producer,
		consumer:    consumer,
		sync:        sync,
		dataHandler: dataHandler,
	}
}

// QueueWithKafka 基于kafka的消息队列
//
// Author : zhangdeman001@ke.com<张德满>
//
// Date : 11:10 下午 2021/9/23
type QueueWithKafka struct {
	producer    kafka.Producer       // 生产者
	consumer    kafka.Consumer       // 消费者
	sync        bool                 // 同步发消息 or 异步
	dataHandler IKafkaMessageHandler // 订阅到的数据处理方法
}

// Publish 发布消息
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 11:17 下午 2021/9/23
func (q *QueueWithKafka) Publish(data interface{}) error {
	var (
		ok            bool
		publishData   kafka.ProducerData
		publishResult kafka.ProducerResult
	)
	if publishData, ok = data.(kafka.ProducerData); !ok {
		return errors.New("data format is not kafka.ProducerData")
	}
	if q.sync {
		publishResult = q.producer.Sync(publishData)
	} else {
		publishResult = q.producer.Async(publishData)
	}
	return publishResult.Err
}

// Subscribe 订阅消息
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 11:17 下午 2021/9/23
func (q *QueueWithKafka) Subscribe() {
	q.consumer.Consume(q.dataHandler)
}