Skip to content
This repository was archived by the owner on Oct 7, 2022. It is now read-only.

Commit 7d171d8

Browse files
authored
Merge pull request #7 from inloco/fix/bulk-create-handle-errors
Fix/bulk create handle errors
2 parents e836f3a + 1d035f5 commit 7d171d8

File tree

8 files changed

+114
-20
lines changed

8 files changed

+114
-20
lines changed

Gopkg.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ To create new injectors for your topics, you should create a new kubernetes depl
2929
- `LOG_LEVEL` Determines the log level for the app. Should be set to DEBUG, WARN, NONE or INFO. Defaults to INFO. **OPTIONAL**
3030
- `METRICS_PORT` Port to export app metrics **REQUIRED**
3131
- `ES_BULK_TIMEOUT` Timeout for elasticsearch bulk writes in the format of golang's `time.ParseDuration`. Default value is 1s **OPTIONAL**
32+
- `ES_BULK_BACKOFF` Constant backoff when elasticsearch is overloaded. in the format of golang's `time.ParseDuration`. Default value is 1s **OPTIONAL**
3233
- `KAFKA_CONSUMER_RECORD_TYPE` Kafka record type. Should be set to "avro" or "json". Defaults to avro. **OPTIONAL**
3334
- `KAFKA_CONSUMER_METRICS_UPDATE_INTERVAL` The interval which the app updates the exported metrics in the format of golang's `time.ParseDuration`. Defaults to 30s. **OPTIONAL**
3435

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
0.5.2
1+
0.5.3

src/elasticsearch/config.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ type Config struct {
1313
DocIDColumn string
1414
BlacklistedColumns []string
1515
BulkTimeout time.Duration
16+
Backoff time.Duration
1617
}
1718

1819
func NewConfig() Config {
@@ -24,12 +25,21 @@ func NewConfig() Config {
2425
timeout = d
2526
}
2627
}
28+
backoffStr, exists := os.LookupEnv("ES_BULK_BACKOFF")
29+
backoff := 1 * time.Second
30+
if exists {
31+
d, err := time.ParseDuration(backoffStr)
32+
if err == nil {
33+
backoff = d
34+
}
35+
}
2736
return Config{
2837
Host: os.Getenv("ELASTICSEARCH_HOST"),
2938
Index: os.Getenv("ES_INDEX"),
3039
IndexColumn: os.Getenv("ES_INDEX_COLUMN"),
3140
DocIDColumn: os.Getenv("ES_DOC_ID_COLUMN"),
3241
BlacklistedColumns: strings.Split(os.Getenv("ES_BLACKLISTED_COLUMNS"), ","),
3342
BulkTimeout: timeout,
43+
Backoff: backoff,
3444
}
3545
}

src/elasticsearch/elasticsearch.go

Lines changed: 50 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,12 @@ import (
55

66
"fmt"
77

8+
"net/http"
9+
810
"github.com/go-kit/kit/log"
911
"github.com/go-kit/kit/log/level"
1012
"github.com/inloco/kafka-elasticsearch-injector/src/models"
1113
"github.com/olivere/elastic"
12-
"github.com/pkg/errors"
1314
)
1415

1516
var esClient *elastic.Client
@@ -21,7 +22,7 @@ type basicDatabase interface {
2122

2223
type RecordDatabase interface {
2324
basicDatabase
24-
Insert(records []*models.ElasticRecord) error
25+
Insert(records []*models.ElasticRecord) (*InsertResponse, error)
2526
ReadinessCheck() bool
2627
}
2728

@@ -49,24 +50,62 @@ func (d recordDatabase) CloseClient() {
4950
}
5051
}
5152

52-
func (d recordDatabase) Insert(records []*models.ElasticRecord) error {
53+
type InsertResponse struct {
54+
AlreadyExists []string
55+
Retry []*models.ElasticRecord
56+
Overloaded bool
57+
}
58+
59+
func (d recordDatabase) Insert(records []*models.ElasticRecord) (*InsertResponse, error) {
5360
bulkRequest, err := d.buildBulkRequest(records)
5461
if err != nil {
55-
return err
62+
return nil, err
5663
}
5764
timeout := d.config.BulkTimeout
5865
ctx, cancel := context.WithTimeout(context.Background(), timeout)
5966
defer cancel()
6067
res, err := bulkRequest.Do(ctx)
61-
if err == nil {
62-
if res.Errors {
63-
for _, f := range res.Failed() {
64-
return errors.New(fmt.Sprintf("%v", f.Error))
68+
69+
if err != nil {
70+
return nil, err
71+
}
72+
if res.Errors {
73+
created := res.Created()
74+
var alreadyExistsIds []string
75+
for _, c := range created {
76+
if c.Status == http.StatusConflict {
77+
alreadyExistsIds = append(alreadyExistsIds, c.Id)
78+
}
79+
}
80+
if len(alreadyExistsIds) > 0 {
81+
level.Warn(d.logger).Log("message", "document already exists", "doc_count", len(alreadyExistsIds))
82+
}
83+
failed := res.Failed()
84+
var retry []*models.ElasticRecord
85+
overloaded := false
86+
if len(failed) > 0 {
87+
recordMap := make(map[string]*models.ElasticRecord)
88+
for _, rec := range records {
89+
recordMap[rec.ID] = rec
90+
}
91+
for _, f := range failed {
92+
if f.Status == http.StatusConflict {
93+
continue
94+
}
95+
retry = append(retry, recordMap[f.Id])
96+
if f.Status == http.StatusTooManyRequests {
97+
//es is overloaded, backoff
98+
overloaded = true
99+
}
100+
}
101+
if overloaded {
102+
level.Warn(d.logger).Log("message", "insert failed: elasticsearch is overloaded", "retry_count", len(retry))
65103
}
66104
}
105+
return &InsertResponse{alreadyExistsIds, retry, overloaded}, nil
67106
}
68107

69-
return err
108+
return &InsertResponse{[]string{}, []*models.ElasticRecord{}, false}, nil
70109
}
71110

72111
func (d recordDatabase) ReadinessCheck() bool {
@@ -82,7 +121,8 @@ func (d recordDatabase) ReadinessCheck() bool {
82121
func (d recordDatabase) buildBulkRequest(records []*models.ElasticRecord) (*elastic.BulkService, error) {
83122
bulkRequest := d.GetClient().Bulk()
84123
for _, record := range records {
85-
bulkRequest.Add(elastic.NewBulkIndexRequest().Index(record.Index).
124+
bulkRequest.Add(elastic.NewBulkIndexRequest().OpType("create").
125+
Index(record.Index).
86126
Type(record.Type).
87127
Id(record.ID).
88128
Doc(record.Json))

src/elasticsearch/elasticsearch_test.go

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import (
1111

1212
"encoding/json"
1313

14+
"strconv"
15+
1416
"github.com/inloco/kafka-elasticsearch-injector/src/kafka/fixtures"
1517
"github.com/inloco/kafka-elasticsearch-injector/src/logger_builder"
1618
"github.com/inloco/kafka-elasticsearch-injector/src/models"
@@ -72,8 +74,30 @@ func TestRecordDatabase_ReadinessCheck(t *testing.T) {
7274

7375
func TestRecordDatabase_Insert(t *testing.T) {
7476
record, id := fixtures.NewElasticRecord()
75-
err := db.Insert([]*models.ElasticRecord{record})
77+
_, err := db.Insert([]*models.ElasticRecord{record})
78+
db.GetClient().Refresh("_all").Do(context.Background())
79+
var recordFromES fixtures.FixtureRecord
80+
if assert.NoError(t, err) {
81+
count, err := db.GetClient().Count(record.Index).Do(context.Background())
82+
if assert.NoError(t, err) {
83+
assert.Equal(t, int64(1), count)
84+
}
85+
res, err := db.GetClient().Get().Index(record.Index).Type(record.Type).Id(record.ID).Do(context.Background())
86+
if assert.NoError(t, err) {
87+
json.Unmarshal(*res.Source, &recordFromES)
88+
}
89+
assert.Equal(t, recordFromES.Id, id)
90+
}
91+
db.GetClient().DeleteByQuery(record.Index).Query(elastic.MatchAllQuery{}).Do(context.Background())
92+
}
93+
94+
func TestRecordDatabase_Insert_RepeatedId(t *testing.T) {
95+
record, id := fixtures.NewElasticRecord()
96+
_, err := db.Insert([]*models.ElasticRecord{record})
7697
db.GetClient().Refresh("_all").Do(context.Background())
98+
res, err := db.Insert([]*models.ElasticRecord{record})
99+
assert.Len(t, res.AlreadyExists, 1)
100+
assert.Contains(t, res.AlreadyExists, strconv.Itoa(int(id)))
77101
var recordFromES fixtures.FixtureRecord
78102
if assert.NoError(t, err) {
79103
count, err := db.GetClient().Count(record.Index).Do(context.Background())
@@ -91,7 +115,7 @@ func TestRecordDatabase_Insert(t *testing.T) {
91115

92116
func TestRecordDatabase_Insert_Multiple(t *testing.T) {
93117
record, id := fixtures.NewElasticRecord()
94-
err := db.Insert([]*models.ElasticRecord{record, record})
118+
_, err := db.Insert([]*models.ElasticRecord{record, record})
95119
db.GetClient().Refresh("_all").Do(context.Background())
96120
var recordFromES fixtures.FixtureRecord
97121
if assert.NoError(t, err) {

src/injector/store/store.go

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package store
22

33
import (
4+
"time"
5+
46
"github.com/go-kit/kit/log"
57
"github.com/inloco/kafka-elasticsearch-injector/src/elasticsearch"
68
"github.com/inloco/kafka-elasticsearch-injector/src/models"
@@ -12,16 +14,31 @@ type Store interface {
1214
}
1315

1416
type basicStore struct {
15-
db elasticsearch.RecordDatabase
16-
codec elasticsearch.Codec
17+
db elasticsearch.RecordDatabase
18+
codec elasticsearch.Codec
19+
backoff time.Duration
1720
}
1821

1922
func (s basicStore) Insert(records []*models.Record) error {
2023
elasticRecords, err := s.codec.EncodeElasticRecords(records)
2124
if err != nil {
2225
return err
2326
}
24-
return s.db.Insert(elasticRecords)
27+
for {
28+
res, err := s.db.Insert(elasticRecords)
29+
if err != nil {
30+
return err
31+
}
32+
if len(res.Retry) == 0 {
33+
break
34+
}
35+
//some records failed to index, backoff(if overloaded) then retry
36+
if res.Overloaded {
37+
time.Sleep(s.backoff)
38+
}
39+
s.db.Insert(res.Retry)
40+
}
41+
return nil
2542
}
2643

2744
func (s basicStore) ReadinessCheck() bool {
@@ -31,7 +48,8 @@ func (s basicStore) ReadinessCheck() bool {
3148
func NewStore(logger log.Logger) Store {
3249
config := elasticsearch.NewConfig()
3350
return basicStore{
34-
db: elasticsearch.NewDatabase(logger, config),
35-
codec: elasticsearch.NewCodec(logger, config),
51+
db: elasticsearch.NewDatabase(logger, config),
52+
codec: elasticsearch.NewCodec(logger, config),
53+
backoff: config.Backoff,
3654
}
3755
}

src/kafka/consumer_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ func (s fixtureService) Insert(records []*models.Record) error {
3434
if err != nil {
3535
return err
3636
}
37-
return s.db.Insert(elasticRecords)
37+
_, err = s.db.Insert(elasticRecords)
38+
return err
3839
}
3940

4041
func (s fixtureService) ReadinessCheck() bool {

0 commit comments

Comments
 (0)