Skip to content
This repository was archived by the owner on Oct 7, 2022. It is now read-only.

Commit d5c74cf

Browse files
committed
feat(metrics): add buffer full gauge
1 parent 225aca9 commit d5c74cf

File tree

2 files changed

+20
-2
lines changed

2 files changed

+20
-2
lines changed

src/kafka/consumer.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,10 @@ func (k *kafka) Start(signals chan os.Signal, notifications chan<- Notification)
103103
"message", "Buffer is full ",
104104
"channelSize", cap(k.consumerCh),
105105
)
106+
k.metricsPublisher.BufferFull(true)
106107
}
107108
k.consumerCh <- msg
109+
k.metricsPublisher.BufferFull(false)
108110
}
109111
case err, more := <-consumer.Errors():
110112
if more {

src/metrics/metrics.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,21 @@ package metrics
33
import (
44
"strconv"
55

6+
"sync"
7+
68
"github.com/go-kit/kit/log"
79
"github.com/go-kit/kit/log/level"
810
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
911
"github.com/inloco/kafka-elasticsearch-injector/src/logger_builder"
1012
stdprometheus "github.com/prometheus/client_golang/prometheus"
11-
"sync"
1213
)
1314

1415
type metrics struct {
1516
logger log.Logger
1617
partitionDelay *kitprometheus.Gauge
1718
recordsConsumed *kitprometheus.Counter
1819
endpointLatencyHistogram *kitprometheus.Summary
20+
bufferFullGauge *kitprometheus.Gauge
1921
lock sync.RWMutex
2022
topicPartitionToOffset map[string]map[int32]int64
2123
}
@@ -59,11 +61,20 @@ func (m *metrics) PublishOffsetMetrics(highWaterMarks map[string]map[int32]int64
5961
}
6062
}
6163

64+
func (m *metrics) BufferFull(full bool) {
65+
val := 0.0
66+
if full {
67+
val = 1.0
68+
}
69+
m.bufferFullGauge.Set(val)
70+
}
71+
6272
type MetricsPublisher interface {
6373
PublishOffsetMetrics(highWaterMarks map[string]map[int32]int64)
6474
UpdateOffset(topic string, partition int32, delay int64)
6575
IncrementRecordsConsumed(count int)
6676
RecordEndpointLatency(latency float64)
77+
BufferFull(full bool)
6778
}
6879

6980
func NewMetricsPublisher() MetricsPublisher {
@@ -80,12 +91,17 @@ func NewMetricsPublisher() MetricsPublisher {
8091
Name: "kafka_consumer_endpoint_latency_histogram_seconds",
8192
Help: "Kafka consumer endpoint latency histogram in seconds",
8293
}, []string{})
94+
bufferFullGauge := kitprometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
95+
Name: "kafka_consumer_buffer_full",
96+
Help: "Kafka consumer boolean indicating if app buffer is full",
97+
}, []string{})
8398
return &metrics{
8499
logger: logger,
85100
partitionDelay: partitionDelay,
86101
recordsConsumed: recordsConsumed,
87102
endpointLatencyHistogram: endpointLatencySummary,
88-
lock: sync.RWMutex{},
103+
bufferFullGauge: bufferFullGauge,
104+
lock: sync.RWMutex{},
89105
topicPartitionToOffset: make(map[string]map[int32]int64),
90106
}
91107
}

0 commit comments

Comments
 (0)