Skip to content
This repository was archived by the owner on Oct 7, 2022. It is now read-only.

Commit 2c0ad55

Browse files
committed
feat(store): backoff only if elasticsearch is overloaded
1 parent e4ae696 commit 2c0ad55

File tree

2 files changed

+8
-5
lines changed

2 files changed

+8
-5
lines changed

src/elasticsearch/elasticsearch.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ func (d recordDatabase) CloseClient() {
5353
type InsertResponse struct {
5454
AlreadyExists []string
5555
Retry []*models.ElasticRecord
56+
Overloaded bool
5657
}
5758

5859
func (d recordDatabase) Insert(records []*models.ElasticRecord) (*InsertResponse, error) {
@@ -81,12 +82,12 @@ func (d recordDatabase) Insert(records []*models.ElasticRecord) (*InsertResponse
8182
}
8283
failed := res.Failed()
8384
var retry []*models.ElasticRecord
85+
overloaded := false
8486
if len(failed) > 0 {
8587
recordMap := make(map[string]*models.ElasticRecord)
8688
for _, rec := range records {
8789
recordMap[rec.ID] = rec
8890
}
89-
overloaded := false
9091
for _, f := range failed {
9192
if f.Status == http.StatusConflict {
9293
continue
@@ -101,10 +102,10 @@ func (d recordDatabase) Insert(records []*models.ElasticRecord) (*InsertResponse
101102
level.Warn(d.logger).Log("message", "insert failed: elasticsearch is overloaded", "retry_count", len(retry))
102103
}
103104
}
104-
return &InsertResponse{alreadyExistsIds, retry}, nil
105+
return &InsertResponse{alreadyExistsIds, retry, overloaded}, nil
105106
}
106107

107-
return &InsertResponse{[]string{}, []*models.ElasticRecord{}}, nil
108+
return &InsertResponse{[]string{}, []*models.ElasticRecord{}, false}, nil
108109
}
109110

110111
func (d recordDatabase) ReadinessCheck() bool {

src/injector/store/store.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,10 @@ func (s basicStore) Insert(records []*models.Record) error {
3232
if len(res.Retry) == 0 {
3333
break
3434
}
35-
//some records failed to index, backoff then retry
36-
time.Sleep(s.backoff)
35+
//some records failed to index, backoff(if overloaded) then retry
36+
if res.Overloaded {
37+
time.Sleep(s.backoff)
38+
}
3739
s.db.Insert(res.Retry)
3840
}
3941
return nil

0 commit comments

Comments
 (0)