@@ -6,13 +6,12 @@ import (
66
77 "time"
88
9- "sync"
10-
119 "github.com/Shopify/sarama"
1210 "github.com/bsm/sarama-cluster"
1311 "github.com/go-kit/kit/endpoint"
1412 "github.com/go-kit/kit/log"
1513 "github.com/go-kit/kit/log/level"
14+ "github.com/inloco/kafka-elasticsearch-injector/src/metrics"
1615 "github.com/inloco/kafka-elasticsearch-injector/src/models"
1716)
1817
@@ -24,9 +23,12 @@ const (
2423)
2524
2625type kafka struct {
27- consumer Consumer
28- config * cluster.Config
29- brokers []string
26+ consumer Consumer
27+ consumerCh chan * sarama.ConsumerMessage
28+ offsetCh chan * topicPartitionOffset
29+ config * cluster.Config
30+ brokers []string
31+ metricsPublisher metrics.MetricsPublisher
3032}
3133
3234type Consumer struct {
@@ -56,13 +58,16 @@ func NewKafka(address string, consumer Consumer) kafka {
5658 config .Version = sarama .V0_10_0_0
5759
5860 return kafka {
59- brokers : brokers ,
60- config : config ,
61- consumer : consumer ,
61+ brokers : brokers ,
62+ config : config ,
63+ consumer : consumer ,
64+ metricsPublisher : metrics .NewMetricsPublisher (),
65+ consumerCh : make (chan * sarama.ConsumerMessage , consumer .BufferSize ),
66+ offsetCh : make (chan * topicPartitionOffset ),
6267 }
6368}
6469
65- func (k * kafka ) Start (signals chan os.Signal , notifications chan Notification ) {
70+ func (k * kafka ) Start (signals chan os.Signal , notifications chan <- Notification ) {
6671 topics := k .consumer .Topics
6772 concurrency := k .consumer .Concurrency
6873 consumer , err := cluster .NewConsumer (k .brokers , k .consumer .Group , topics , k .config )
@@ -72,83 +77,19 @@ func (k *kafka) Start(signals chan os.Signal, notifications chan Notification) {
7277 defer consumer .Close ()
7378
7479 buffSize := k .consumer .BatchSize
75- // Fan-out channel
76- consumerCh := make (chan * sarama.ConsumerMessage , k .consumer .BufferSize )
77- // Update offset channel
78- offsetCh := make (chan * topicPartitionOffset )
7980 for i := 0 ; i < concurrency ; i ++ {
80- go func () {
81- buf := make ([]* sarama.ConsumerMessage , buffSize )
82- var decoded []* models.Record
83- idx := 0
84- for {
85- kafkaMsg := <- consumerCh
86- buf [idx ] = kafkaMsg
87- idx ++
88- for idx == buffSize {
89- if decoded == nil {
90- for _ , msg := range buf {
91- req , err := k .consumer .Decoder (nil , msg )
92- if err != nil {
93- level .Error (k .consumer .Logger ).Log (
94- "message" , "Error decoding message" ,
95- "err" , err .Error (),
96- )
97- continue
98- }
99- decoded = append (decoded , req )
100- }
101- }
102- if res , err := k .consumer .Endpoint (context .Background (), decoded ); err != nil {
103- level .Error (k .consumer .Logger ).Log ("message" , "error on endpoint call" , "err" , err .Error ())
104- var _ = res // ignore res (for now)
105- continue
106- }
107- notifications <- Inserted
108- incrementRecordsConsumed (buffSize )
109- for _ , msg := range buf {
110- offsetCh <- & topicPartitionOffset {msg .Topic , msg .Partition , msg .Offset }
111- consumer .MarkOffset (msg , "" ) // mark message as processed
112- }
113- decoded = nil
114- idx = 0
115- }
116- }
117- }()
81+ go k .worker (consumer , buffSize , notifications )
11882 }
119- lock := sync.RWMutex {}
120- topicPartitionToOffset := make (map [string ]map [int32 ]int64 )
12183 go func () {
12284 for {
123- offset := <- offsetCh
124- lock .Lock ()
125- currentOffset , exists := topicPartitionToOffset [offset.topic ][offset.partition ]
126- if ! exists || offset .offset > currentOffset {
127- _ , exists := topicPartitionToOffset [offset .topic ]
128- if ! exists {
129- topicPartitionToOffset [offset .topic ] = make (map [int32 ]int64 )
130- }
131- topicPartitionToOffset [offset.topic ][offset.partition ] = offset .offset
132- }
133- lock .Unlock ()
85+ offset := <- k .offsetCh
86+ k .metricsPublisher .UpdateOffset (offset .topic , offset .partition , offset .offset )
13487 }
13588 }()
13689
13790 go func () {
13891 for range time .Tick (k .consumer .MetricsUpdateInterval ) {
139- for topic , partitions := range consumer .HighWaterMarks () {
140- for partition , maxOffset := range partitions {
141- lock .RLock ()
142- offset , ok := topicPartitionToOffset [topic ][partition ]
143- lock .RUnlock ()
144- if ok {
145- delay := maxOffset - offset
146- level .Info (k .consumer .Logger ).Log ("message" , "updating partition offset metric" ,
147- "partition" , partition , "maxOffset" , maxOffset , "current" , offset , "delay" , delay )
148- updateOffset (topic , partition , delay )
149- }
150- }
151- }
92+ k .metricsPublisher .PublishOffsetMetrics (consumer .HighWaterMarks ())
15293 }
15394 }()
15495
@@ -157,13 +98,13 @@ func (k *kafka) Start(signals chan os.Signal, notifications chan Notification) {
15798 select {
15899 case msg , more := <- consumer .Messages ():
159100 if more {
160- if len (consumerCh ) >= cap (consumerCh ) {
101+ if len (k . consumerCh ) >= cap (k . consumerCh ) {
161102 level .Warn (k .consumer .Logger ).Log (
162103 "message" , "Buffer is full " ,
163- "channelSize" , cap (consumerCh ),
104+ "channelSize" , cap (k . consumerCh ),
164105 )
165106 }
166- consumerCh <- msg
107+ k . consumerCh <- msg
167108 }
168109 case err , more := <- consumer .Errors ():
169110 if more {
@@ -186,5 +127,43 @@ func (k *kafka) Start(signals chan os.Signal, notifications chan Notification) {
186127 return
187128 }
188129 }
130+ }
189131
132+ func (k * kafka ) worker (consumer * cluster.Consumer , buffSize int , notifications chan <- Notification ) {
133+ buf := make ([]* sarama.ConsumerMessage , buffSize )
134+ var decoded []* models.Record
135+ idx := 0
136+ for {
137+ kafkaMsg := <- k .consumerCh
138+ buf [idx ] = kafkaMsg
139+ idx ++
140+ for idx == buffSize {
141+ if decoded == nil {
142+ for _ , msg := range buf {
143+ req , err := k .consumer .Decoder (nil , msg )
144+ if err != nil {
145+ level .Error (k .consumer .Logger ).Log (
146+ "message" , "Error decoding message" ,
147+ "err" , err .Error (),
148+ )
149+ continue
150+ }
151+ decoded = append (decoded , req )
152+ }
153+ }
154+ if res , err := k .consumer .Endpoint (context .Background (), decoded ); err != nil {
155+ level .Error (k .consumer .Logger ).Log ("message" , "error on endpoint call" , "err" , err .Error ())
156+ var _ = res // ignore res (for now)
157+ continue
158+ }
159+ notifications <- Inserted
160+ k .metricsPublisher .IncrementRecordsConsumed (buffSize )
161+ for _ , msg := range buf {
162+ k .offsetCh <- & topicPartitionOffset {msg .Topic , msg .Partition , msg .Offset }
163+ consumer .MarkOffset (msg , "" ) // mark message as processed
164+ }
165+ decoded = nil
166+ idx = 0
167+ }
168+ }
190169}
0 commit comments