增加kafka消费者
This commit is contained in:
parent
c8fda70466
commit
2e8885b722
26
middleware/kafka/abstract.go
Normal file
26
middleware/kafka/abstract.go
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
// Package kafka...
|
||||||
|
//
|
||||||
|
// Description : kafka...
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 2021-09-22 9:07 下午
|
||||||
|
package kafka
|
||||||
|
|
||||||
|
import "github.com/Shopify/sarama"
|
||||||
|
|
||||||
|
// IConsumeDataHandler ...
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 9:07 下午 2021/9/22
|
||||||
|
type IConsumeDataHandler interface {
|
||||||
|
// Execute 处理数据
|
||||||
|
Execute(data *sarama.ConsumerMessage) error
|
||||||
|
// Success 成功回调
|
||||||
|
Success(data *sarama.ConsumerMessage)
|
||||||
|
// Fail 处理失败回调
|
||||||
|
Fail(data *sarama.ConsumerMessage, err error)
|
||||||
|
// Exception 消费出现异常回调
|
||||||
|
Exception(err error)
|
||||||
|
}
|
@ -6,3 +6,110 @@
|
|||||||
//
|
//
|
||||||
// Date : 2021-09-21 3:46 下午
|
// Date : 2021-09-21 3:46 下午
|
||||||
package kafka
|
package kafka
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
|
||||||
|
"github.com/Shopify/sarama"
|
||||||
|
)
|
||||||
|
|
||||||
|
// InitConsumer 初始化消费者
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 8:45 下午 2021/9/22
|
||||||
|
func InitConsumer(topic string, consumerGroup string, hostList []string, cfg *sarama.Config) (*Consumer, error) {
|
||||||
|
if nil == cfg {
|
||||||
|
cfg = sarama.NewConfig()
|
||||||
|
cfg.Consumer.Return.Errors = false
|
||||||
|
cfg.Consumer.Offsets.Initial = sarama.OffsetNewest // 默认从最新初开始消费消息
|
||||||
|
}
|
||||||
|
c := &Consumer{
|
||||||
|
topic: topic,
|
||||||
|
host: hostList,
|
||||||
|
cfg: cfg,
|
||||||
|
consumerGroup: consumerGroup,
|
||||||
|
}
|
||||||
|
err := c.init()
|
||||||
|
return c, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Consumer ...
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 8:52 下午 2021/9/22
|
||||||
|
type Consumer struct {
|
||||||
|
topic string
|
||||||
|
host []string
|
||||||
|
cfg *sarama.Config
|
||||||
|
consumerGroup string
|
||||||
|
client sarama.Client
|
||||||
|
consumerGroupInstance sarama.ConsumerGroup
|
||||||
|
}
|
||||||
|
|
||||||
|
// init ...
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 8:55 下午 2021/9/22
|
||||||
|
func (c *Consumer) init() error {
|
||||||
|
var (
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
if c.client, err = sarama.NewClient(c.host, c.cfg); nil != err {
|
||||||
|
return errors.New("init kafka client fail : " + err.Error())
|
||||||
|
}
|
||||||
|
if c.consumerGroupInstance, err = sarama.NewConsumerGroupFromClient(c.consumerGroup, c.client); nil != err {
|
||||||
|
return errors.New("consumer group init fail : " + err.Error())
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Consume 消费消息
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 9:04 下午 2021/9/22
|
||||||
|
func (c *Consumer) Consume(dataHandler IConsumeDataHandler) {
|
||||||
|
ctx := context.Background()
|
||||||
|
for {
|
||||||
|
topics := []string{c.consumerGroup}
|
||||||
|
handler := consumerGroupHandler{name: c.consumerGroup, handler: dataHandler}
|
||||||
|
if err := c.consumerGroupInstance.Consume(ctx, topics, handler); nil != err {
|
||||||
|
handler.handler.Exception(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type consumerGroupHandler struct {
|
||||||
|
name string
|
||||||
|
handler IConsumeDataHandler
|
||||||
|
}
|
||||||
|
|
||||||
|
func (consumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
|
||||||
|
func (consumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
|
||||||
|
func (h consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
|
||||||
|
for msg := range claim.Messages() {
|
||||||
|
if err := h.handler.Execute(msg); nil != err {
|
||||||
|
h.handler.Fail(msg, err)
|
||||||
|
// 手动确认消息
|
||||||
|
sess.MarkMessage(msg, "")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
h.handler.Success(msg)
|
||||||
|
// 手动确认消息
|
||||||
|
sess.MarkMessage(msg, "")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Destroy ...
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 8:59 下午 2021/9/22
|
||||||
|
func (c *Consumer) Destroy() {
|
||||||
|
_ = c.consumerGroupInstance.Close()
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user