Skip to content
This repository was archived by the owner on Oct 7, 2022. It is now read-only.

Commit 225aca9

Browse files
authored
Merge pull request #3 from inloco/feature/endpoint-latency-metrics
Feature: add endpoint latency metrics
2 parents a1c54dd + fa1f53e commit 225aca9

File tree

9 files changed

+63
-20
lines changed

9 files changed

+63
-20
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ dashboards as inspiration for your application.
105105
The exported metrics are:
106106
- `kafka_consumer_partition_delay`: number of records betweeen last record consumed successfully and the last record on kafka, by partition and topic.
107107
- `kafka_consumer_records_consumed_successfully`: number of records consumed successfully by this instance.
108+
- `kafka_consumer_endpoint_latency_histogram_seconds`: endpoint latency in seconds (insertion to elasticsearch).
108109

109110
## Development
110111

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
0.5.0
1+
0.5.1

cmd/injector.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ func main() {
4646
MetricsUpdateInterval: os.Getenv("KAFKA_CONSUMER_METRICS_UPDATE_INTERVAL"),
4747
RecordType: os.Getenv("KAFKA_CONSUMER_RECORD_TYPE"),
4848
}
49-
50-
service := injector.NewService(logger)
49+
metricsPublisher := metrics.NewMetricsPublisher()
50+
service := injector.NewService(logger, metricsPublisher)
5151
p.SetReadinessCheck(service.ReadinessCheck)
5252

5353
endpoints := injector.MakeEndpoints(service)
@@ -57,7 +57,7 @@ func main() {
5757
level.Error(logger).Log("err", err, "message", "error creating kafka consumer")
5858
panic(err)
5959
}
60-
k := kafka.NewKafka(os.Getenv("KAFKA_ADDRESS"), consumer)
60+
k := kafka.NewKafka(os.Getenv("KAFKA_ADDRESS"), consumer, metricsPublisher)
6161

6262
signals := make(chan os.Signal, 1)
6363
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)

docker-compose.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ services:
5959
- KAFKA_CONSUMER_BATCH_SIZE=10
6060
- METRICS_PORT=9102
6161
- LOG_LEVEL=DEBUG
62+
ports:
63+
- "9102:9102"
6264
producer:
6365
image: 'inlocomedia/kafka-elasticsearch-injector:producer-local'
6466
container_name: producer

src/injector/middleware.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package injector
2+
3+
import (
4+
"time"
5+
6+
"github.com/inloco/kafka-elasticsearch-injector/src/metrics"
7+
"github.com/inloco/kafka-elasticsearch-injector/src/models"
8+
)
9+
10+
type instrumentingMiddleware struct {
11+
metricsPublisher metrics.MetricsPublisher
12+
next Service
13+
}
14+
15+
func (s instrumentingMiddleware) Insert(records []*models.Record) error {
16+
begin := time.Now()
17+
err := s.next.Insert(records)
18+
s.metricsPublisher.RecordEndpointLatency(time.Since(begin).Seconds())
19+
return err
20+
}
21+
22+
func (s instrumentingMiddleware) ReadinessCheck() bool {
23+
return s.next.ReadinessCheck()
24+
}

src/injector/service.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
package injector
22

33
import (
4+
"github.com/go-kit/kit/log"
45
"github.com/inloco/kafka-elasticsearch-injector/src/injector/store"
6+
"github.com/inloco/kafka-elasticsearch-injector/src/metrics"
57
"github.com/inloco/kafka-elasticsearch-injector/src/models"
6-
"github.com/go-kit/kit/log"
78
)
89

910
type Service interface {
@@ -23,8 +24,11 @@ func (s basicService) ReadinessCheck() bool {
2324
return s.store.ReadinessCheck()
2425
}
2526

26-
func NewService(logger log.Logger) Service {
27-
return basicService{
28-
store.NewStore(logger),
27+
func NewService(logger log.Logger, metrics metrics.MetricsPublisher) Service {
28+
return instrumentingMiddleware{
29+
metricsPublisher: metrics,
30+
next: basicService{
31+
store.NewStore(logger),
32+
},
2933
}
3034
}

src/kafka/consumer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ type topicPartitionOffset struct {
4949
offset int64
5050
}
5151

52-
func NewKafka(address string, consumer Consumer) kafka {
52+
func NewKafka(address string, consumer Consumer, metrics metrics.MetricsPublisher) kafka {
5353
brokers := []string{address}
5454
config := cluster.NewConfig()
5555
config.Consumer.Return.Errors = true
@@ -61,7 +61,7 @@ func NewKafka(address string, consumer Consumer) kafka {
6161
brokers: brokers,
6262
config: config,
6363
consumer: consumer,
64-
metricsPublisher: metrics.NewMetricsPublisher(),
64+
metricsPublisher: metrics,
6565
consumerCh: make(chan *sarama.ConsumerMessage, consumer.BufferSize),
6666
offsetCh: make(chan *topicPartitionOffset),
6767
}

src/kafka/consumer_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/inloco/kafka-elasticsearch-injector/src/elasticsearch"
1818
"github.com/inloco/kafka-elasticsearch-injector/src/kafka/fixtures"
1919
"github.com/inloco/kafka-elasticsearch-injector/src/logger_builder"
20+
"github.com/inloco/kafka-elasticsearch-injector/src/metrics"
2021
"github.com/inloco/kafka-elasticsearch-injector/src/models"
2122
"github.com/inloco/kafka-elasticsearch-injector/src/schema_registry"
2223
"github.com/olivere/elastic"
@@ -88,7 +89,7 @@ func TestMain(m *testing.M) {
8889
BatchSize: 1,
8990
MetricsUpdateInterval: 30 * time.Second,
9091
}
91-
k = NewKafka("localhost:9092", consumer)
92+
k = NewKafka("localhost:9092", consumer, metrics.NewMetricsPublisher())
9293
retCode := m.Run()
9394
os.Exit(retCode)
9495
}

src/metrics/metrics.go

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,22 @@ import (
1212
)
1313

1414
type metrics struct {
15-
logger log.Logger
16-
partitionDelay *kitprometheus.Gauge
17-
recordsConsumed *kitprometheus.Counter
18-
lock sync.RWMutex
19-
topicPartitionToOffset map[string]map[int32]int64
15+
logger log.Logger
16+
partitionDelay *kitprometheus.Gauge
17+
recordsConsumed *kitprometheus.Counter
18+
endpointLatencyHistogram *kitprometheus.Summary
19+
lock sync.RWMutex
20+
topicPartitionToOffset map[string]map[int32]int64
2021
}
2122

2223
func (m *metrics) IncrementRecordsConsumed(count int) {
2324
m.recordsConsumed.Add(float64(count))
2425
}
2526

27+
func (m *metrics) RecordEndpointLatency(latency float64) {
28+
m.endpointLatencyHistogram.Observe(latency)
29+
}
30+
2631
func (m *metrics) UpdateOffset(topic string, partition int32, offset int64) {
2732
m.lock.Lock()
2833
currentOffset, exists := m.topicPartitionToOffset[topic][partition]
@@ -58,6 +63,7 @@ type MetricsPublisher interface {
5863
PublishOffsetMetrics(highWaterMarks map[string]map[int32]int64)
5964
UpdateOffset(topic string, partition int32, delay int64)
6065
IncrementRecordsConsumed(count int)
66+
RecordEndpointLatency(latency float64)
6167
}
6268

6369
func NewMetricsPublisher() MetricsPublisher {
@@ -70,11 +76,16 @@ func NewMetricsPublisher() MetricsPublisher {
7076
Name: "kafka_consumer_partition_delay",
7177
Help: "Kafka consumer partition delay",
7278
}, []string{"partition", "topic"})
79+
endpointLatencySummary := kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{
80+
Name: "kafka_consumer_endpoint_latency_histogram_seconds",
81+
Help: "Kafka consumer endpoint latency histogram in seconds",
82+
}, []string{})
7383
return &metrics{
74-
logger: logger,
75-
partitionDelay: partitionDelay,
76-
recordsConsumed: recordsConsumed,
77-
lock: sync.RWMutex{},
84+
logger: logger,
85+
partitionDelay: partitionDelay,
86+
recordsConsumed: recordsConsumed,
87+
endpointLatencyHistogram: endpointLatencySummary,
88+
lock: sync.RWMutex{},
7889
topicPartitionToOffset: make(map[string]map[int32]int64),
7990
}
8091
}

0 commit comments

Comments
 (0)