Skip to content
This repository was archived by the owner on Oct 7, 2022. It is now read-only.

Commit e836f3a

Browse files
authored
Merge pull request #6 from inloco/feature/buffer-full-metrics
feat(metrics): add buffer full gauge
2 parents 225aca9 + c390a82 commit e836f3a

File tree

4 files changed

+22
-3
lines changed

4 files changed

+22
-3
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ The exported metrics are:
106106
- `kafka_consumer_partition_delay`: number of records betweeen last record consumed successfully and the last record on kafka, by partition and topic.
107107
- `kafka_consumer_records_consumed_successfully`: number of records consumed successfully by this instance.
108108
- `kafka_consumer_endpoint_latency_histogram_seconds`: endpoint latency in seconds (insertion to elasticsearch).
109+
- `kafka_consumer_buffer_full`: indicates whether the app buffer is full(meaning that elasticsearch is not being able to keep up with the topic volume).
109110

110111
## Development
111112

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
0.5.1
1+
0.5.2

src/kafka/consumer.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,10 @@ func (k *kafka) Start(signals chan os.Signal, notifications chan<- Notification)
103103
"message", "Buffer is full ",
104104
"channelSize", cap(k.consumerCh),
105105
)
106+
k.metricsPublisher.BufferFull(true)
106107
}
107108
k.consumerCh <- msg
109+
k.metricsPublisher.BufferFull(false)
108110
}
109111
case err, more := <-consumer.Errors():
110112
if more {

src/metrics/metrics.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,21 @@ package metrics
33
import (
44
"strconv"
55

6+
"sync"
7+
68
"github.com/go-kit/kit/log"
79
"github.com/go-kit/kit/log/level"
810
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
911
"github.com/inloco/kafka-elasticsearch-injector/src/logger_builder"
1012
stdprometheus "github.com/prometheus/client_golang/prometheus"
11-
"sync"
1213
)
1314

1415
type metrics struct {
1516
logger log.Logger
1617
partitionDelay *kitprometheus.Gauge
1718
recordsConsumed *kitprometheus.Counter
1819
endpointLatencyHistogram *kitprometheus.Summary
20+
bufferFullGauge *kitprometheus.Gauge
1921
lock sync.RWMutex
2022
topicPartitionToOffset map[string]map[int32]int64
2123
}
@@ -59,11 +61,20 @@ func (m *metrics) PublishOffsetMetrics(highWaterMarks map[string]map[int32]int64
5961
}
6062
}
6163

64+
func (m *metrics) BufferFull(full bool) {
65+
val := 0.0
66+
if full {
67+
val = 1.0
68+
}
69+
m.bufferFullGauge.Set(val)
70+
}
71+
6272
type MetricsPublisher interface {
6373
PublishOffsetMetrics(highWaterMarks map[string]map[int32]int64)
6474
UpdateOffset(topic string, partition int32, delay int64)
6575
IncrementRecordsConsumed(count int)
6676
RecordEndpointLatency(latency float64)
77+
BufferFull(full bool)
6778
}
6879

6980
func NewMetricsPublisher() MetricsPublisher {
@@ -80,12 +91,17 @@ func NewMetricsPublisher() MetricsPublisher {
8091
Name: "kafka_consumer_endpoint_latency_histogram_seconds",
8192
Help: "Kafka consumer endpoint latency histogram in seconds",
8293
}, []string{})
94+
bufferFullGauge := kitprometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
95+
Name: "kafka_consumer_buffer_full",
96+
Help: "Kafka consumer boolean indicating if app buffer is full",
97+
}, []string{})
8398
return &metrics{
8499
logger: logger,
85100
partitionDelay: partitionDelay,
86101
recordsConsumed: recordsConsumed,
87102
endpointLatencyHistogram: endpointLatencySummary,
88-
lock: sync.RWMutex{},
103+
bufferFullGauge: bufferFullGauge,
104+
lock: sync.RWMutex{},
89105
topicPartitionToOffset: make(map[string]map[int32]int64),
90106
}
91107
}

0 commit comments

Comments
 (0)