Skip to content
This repository was archived by the owner on Oct 7, 2022. It is now read-only.

Commit a1c54dd

Browse files
authored
Merge pull request #5 from inloco/feature/custom-doc-id
Feature/custom doc
2 parents 85cee80 + 9f0b5b1 commit a1c54dd

File tree

11 files changed

+248
-80
lines changed

11 files changed

+248
-80
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ To create new injectors for your topics, you should create a new kubernetes depl
2525
- `KAFKA_CONSUMER_BATCH_SIZE` Number of records to accumulate before sending them to elasticsearch(for each goroutine). Default value is 100 **OPTIONAL**
2626
- `ES_INDEX_COLUMN` Record field to append to index name. Ex: to create one ES index per campaign, use "campaign_id" here **OPTIONAL**
2727
- `ES_BLACKLISTED_COLUMNS` Comma separated list of record fields to filter before sending to elasticsearch. Defaults to empty string. **OPTIONAL**
28+
- `ES_DOC_ID_COLUMN` Record field to be the document ID of Elasticsearch. Defaults to "kafkaRecordPartition:kafkaRecordOffset". **OPTIONAL**
2829
- `LOG_LEVEL` Determines the log level for the app. Should be set to DEBUG, WARN, NONE or INFO. Defaults to INFO. **OPTIONAL**
2930
- `METRICS_PORT` Port to export app metrics **REQUIRED**
3031
- `ES_BULK_TIMEOUT` Timeout for elasticsearch bulk writes in the format of golang's `time.ParseDuration`. Default value is 1s **OPTIONAL**

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
0.4.2
1+
0.5.0

src/elasticsearch/codec.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package elasticsearch
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/go-kit/kit/log"
7+
"github.com/go-kit/kit/log/level"
8+
"github.com/inloco/kafka-elasticsearch-injector/src/models"
9+
)
10+
11+
type Codec interface {
12+
EncodeElasticRecords(records []*models.Record) ([]*models.ElasticRecord, error)
13+
}
14+
15+
type basicCodec struct {
16+
config Config
17+
logger log.Logger
18+
}
19+
20+
func NewCodec(logger log.Logger, config Config) Codec {
21+
return basicCodec{logger: logger, config: config}
22+
}
23+
24+
func (c basicCodec) EncodeElasticRecords(records []*models.Record) ([]*models.ElasticRecord, error) {
25+
elasticRecords := make([]*models.ElasticRecord, len(records))
26+
for idx, record := range records {
27+
index, err := c.getDatabaseIndex(record)
28+
if err != nil {
29+
return nil, err
30+
}
31+
32+
docID, err := c.getDatabaseDocID(record)
33+
if err != nil {
34+
return nil, err
35+
}
36+
37+
elasticRecords[idx] = &models.ElasticRecord{
38+
Index: index,
39+
Type: record.Topic,
40+
ID: docID,
41+
Json: record.FilteredFieldsJSON(c.config.BlacklistedColumns),
42+
}
43+
}
44+
45+
return elasticRecords, nil
46+
}
47+
48+
func (c basicCodec) getDatabaseIndex(record *models.Record) (string, error) {
49+
indexPrefix := c.config.Index
50+
if indexPrefix == "" {
51+
indexPrefix = record.Topic
52+
}
53+
54+
indexColumn := c.config.IndexColumn
55+
indexSuffix := record.FormatTimestamp()
56+
if indexColumn != "" {
57+
newIndexSuffix, err := record.GetValueForField(indexColumn)
58+
if err != nil {
59+
level.Error(c.logger).Log("err", err, "message", "Could not get column value from record.")
60+
return "", err
61+
}
62+
indexSuffix = newIndexSuffix
63+
}
64+
65+
return fmt.Sprintf("%s-%s", indexPrefix, indexSuffix), nil
66+
}
67+
68+
func (c basicCodec) getDatabaseDocID(record *models.Record) (string, error) {
69+
docID := record.GetId()
70+
71+
docIDColumn := c.config.DocIDColumn
72+
if docIDColumn != "" {
73+
newDocID, err := record.GetValueForField(docIDColumn)
74+
if err != nil {
75+
level.Error(c.logger).Log("err", err, "message", "Could not get doc id value from record.")
76+
return "", err
77+
}
78+
docID = newDocID
79+
}
80+
return docID, nil
81+
}

src/elasticsearch/codec_test.go

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package elasticsearch
2+
3+
import (
4+
"fmt"
5+
"strconv"
6+
"testing"
7+
"time"
8+
9+
"github.com/stretchr/testify/assert"
10+
11+
"github.com/inloco/kafka-elasticsearch-injector/src/kafka/fixtures"
12+
"github.com/inloco/kafka-elasticsearch-injector/src/logger_builder"
13+
"github.com/inloco/kafka-elasticsearch-injector/src/models"
14+
)
15+
16+
var codecLogger = logger_builder.NewLogger("elasticsearch-test")
17+
18+
func TestCodec_EncodeElasticRecords(t *testing.T) {
19+
codec := &basicCodec{
20+
config: Config{},
21+
logger: codecLogger,
22+
}
23+
record, id, value := fixtures.NewRecord(time.Now())
24+
25+
elasticRecords, err := codec.EncodeElasticRecords([]*models.Record{record})
26+
if assert.NoError(t, err) && assert.Len(t, elasticRecords, 1) {
27+
elasticRecord := elasticRecords[0]
28+
assert.Equal(t, fmt.Sprintf("%s-%s", record.Topic, record.FormatTimestamp()), elasticRecord.Index)
29+
assert.Equal(t, record.Topic, elasticRecord.Type)
30+
assert.Equal(t, fmt.Sprintf("%d:%d", record.Partition, record.Offset), elasticRecord.ID)
31+
assert.Equal(t, id, elasticRecord.Json["id"])
32+
assert.Equal(t, value, elasticRecord.Json["value"])
33+
}
34+
}
35+
36+
func TestCodec_EncodeElasticRecords_ColumnsBlacklist(t *testing.T) {
37+
codec := &basicCodec{
38+
config: Config{BlacklistedColumns: []string{"value"}},
39+
logger: codecLogger,
40+
}
41+
record, _, _ := fixtures.NewRecord(time.Now())
42+
43+
elasticRecords, err := codec.EncodeElasticRecords([]*models.Record{record})
44+
if assert.NoError(t, err) && assert.Len(t, elasticRecords, 1) {
45+
elasticRecord := elasticRecords[0]
46+
assert.Contains(t, elasticRecord.Json, "id")
47+
assert.NotContains(t, elasticRecord.Json, "value")
48+
}
49+
}
50+
51+
func TestCodec_EncodeElasticRecords_IndexColumn(t *testing.T) {
52+
indexPrefix := "prefix"
53+
54+
codec := &basicCodec{
55+
config: Config{Index: indexPrefix, IndexColumn: "id"},
56+
logger: codecLogger,
57+
}
58+
record, id, _ := fixtures.NewRecord(time.Now())
59+
60+
elasticRecords, err := codec.EncodeElasticRecords([]*models.Record{record})
61+
if assert.NoError(t, err) && assert.Len(t, elasticRecords, 1) {
62+
elasticRecord := elasticRecords[0]
63+
assert.Equal(t, fmt.Sprintf("%v-%v", indexPrefix, id), elasticRecord.Index)
64+
}
65+
}
66+
67+
func TestCodec_EncodeElasticRecords_InexistentIndexColumn(t *testing.T) {
68+
codec := &basicCodec{
69+
config: Config{IndexColumn: "invalid"},
70+
logger: codecLogger,
71+
}
72+
record, _, _ := fixtures.NewRecord(time.Now())
73+
74+
_, err := codec.EncodeElasticRecords([]*models.Record{record})
75+
assert.Error(t, err)
76+
}
77+
78+
func TestCodec_EncodeElasticRecords_DocIDColumn(t *testing.T) {
79+
codec := &basicCodec{
80+
config: Config{DocIDColumn: "id"},
81+
logger: codecLogger,
82+
}
83+
record, id, _ := fixtures.NewRecord(time.Now())
84+
85+
elasticRecords, err := codec.EncodeElasticRecords([]*models.Record{record})
86+
if assert.NoError(t, err) && assert.Len(t, elasticRecords, 1) {
87+
elasticRecord := elasticRecords[0]
88+
assert.Equal(t, strconv.Itoa(int(id)), elasticRecord.ID)
89+
}
90+
}
91+
92+
func TestCodec_EncodeElasticRecords_InexistentDocIDColumn(t *testing.T) {
93+
codec := &basicCodec{
94+
config: Config{DocIDColumn: "invalid"},
95+
logger: codecLogger,
96+
}
97+
record, _, _ := fixtures.NewRecord(time.Now())
98+
99+
_, err := codec.EncodeElasticRecords([]*models.Record{record})
100+
assert.Error(t, err)
101+
}

src/elasticsearch/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ type Config struct {
1010
Host string
1111
Index string
1212
IndexColumn string
13+
DocIDColumn string
1314
BlacklistedColumns []string
1415
BulkTimeout time.Duration
1516
}
@@ -27,6 +28,7 @@ func NewConfig() Config {
2728
Host: os.Getenv("ELASTICSEARCH_HOST"),
2829
Index: os.Getenv("ES_INDEX"),
2930
IndexColumn: os.Getenv("ES_INDEX_COLUMN"),
31+
DocIDColumn: os.Getenv("ES_DOC_ID_COLUMN"),
3032
BlacklistedColumns: strings.Split(os.Getenv("ES_BLACKLISTED_COLUMNS"), ","),
3133
BulkTimeout: timeout,
3234
}

src/elasticsearch/elasticsearch.go

Lines changed: 8 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@ import (
55

66
"fmt"
77

8-
"github.com/inloco/kafka-elasticsearch-injector/src/models"
98
"github.com/go-kit/kit/log"
109
"github.com/go-kit/kit/log/level"
10+
"github.com/inloco/kafka-elasticsearch-injector/src/models"
1111
"github.com/olivere/elastic"
1212
"github.com/pkg/errors"
1313
)
@@ -21,7 +21,7 @@ type basicDatabase interface {
2121

2222
type RecordDatabase interface {
2323
basicDatabase
24-
Insert(records []*models.Record) error
24+
Insert(records []*models.ElasticRecord) error
2525
ReadinessCheck() bool
2626
}
2727

@@ -49,7 +49,7 @@ func (d recordDatabase) CloseClient() {
4949
}
5050
}
5151

52-
func (d recordDatabase) Insert(records []*models.Record) error {
52+
func (d recordDatabase) Insert(records []*models.ElasticRecord) error {
5353
bulkRequest, err := d.buildBulkRequest(records)
5454
if err != nil {
5555
return err
@@ -79,28 +79,13 @@ func (d recordDatabase) ReadinessCheck() bool {
7979
return true
8080
}
8181

82-
func (d recordDatabase) buildBulkRequest(records []*models.Record) (*elastic.BulkService, error) {
82+
func (d recordDatabase) buildBulkRequest(records []*models.ElasticRecord) (*elastic.BulkService, error) {
8383
bulkRequest := d.GetClient().Bulk()
8484
for _, record := range records {
85-
indexName := d.config.Index
86-
if indexName == "" {
87-
indexName = record.Topic
88-
}
89-
indexColumn := d.config.IndexColumn
90-
indexColumnValue := record.FormatTimestamp()
91-
if indexColumn != "" {
92-
newIndexColumnValue, err := record.GetValueForField(indexColumn)
93-
if err != nil {
94-
level.Error(d.logger).Log("err", err, "message", "Could not get column value from record.")
95-
return nil, err
96-
}
97-
indexColumnValue = newIndexColumnValue
98-
}
99-
index := fmt.Sprintf("%s-%s", indexName, indexColumnValue)
100-
bulkRequest.Add(elastic.NewBulkIndexRequest().Index(index).
101-
Type(record.Topic).
102-
Id(record.GetId()).
103-
Doc(record.FilteredFieldsJSON(d.config.BlacklistedColumns)))
85+
bulkRequest.Add(elastic.NewBulkIndexRequest().Index(record.Index).
86+
Type(record.Type).
87+
Id(record.ID).
88+
Doc(record.Json))
10489
}
10590
return bulkRequest, nil
10691
}

src/elasticsearch/elasticsearch_test.go

Lines changed: 10 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77

88
"context"
99

10-
"fmt"
1110
"time"
1211

1312
"encoding/json"
@@ -27,15 +26,7 @@ var config = Config{
2726
BlacklistedColumns: []string{},
2827
BulkTimeout: 10 * time.Second,
2928
}
30-
var configIndexColumnBlacklist = Config{
31-
Host: "http://localhost:9200",
32-
Index: "my-topic",
33-
IndexColumn: "id",
34-
BlacklistedColumns: []string{"id"},
35-
BulkTimeout: 10 * time.Second,
36-
}
3729
var db = NewDatabase(logger, config)
38-
var dbIndexColumnBlacklist = NewDatabase(logger, configIndexColumnBlacklist)
3930
var template = `
4031
{
4132
"template": "my-topic-*",
@@ -69,10 +60,8 @@ var template = `
6960

7061
func TestMain(m *testing.M) {
7162
setupDB(db)
72-
setupDB(dbIndexColumnBlacklist)
7363
retCode := m.Run()
7464
tearDownDB(db)
75-
tearDownDB(dbIndexColumnBlacklist)
7665
os.Exit(retCode)
7766
}
7867

@@ -82,67 +71,41 @@ func TestRecordDatabase_ReadinessCheck(t *testing.T) {
8271
}
8372

8473
func TestRecordDatabase_Insert(t *testing.T) {
85-
now := time.Now()
86-
record, id, _ := fixtures.NewRecord(now)
87-
index := fmt.Sprintf("%s-%s", config.Index, record.FormatTimestamp())
88-
err := db.Insert([]*models.Record{record})
74+
record, id := fixtures.NewElasticRecord()
75+
err := db.Insert([]*models.ElasticRecord{record})
8976
db.GetClient().Refresh("_all").Do(context.Background())
9077
var recordFromES fixtures.FixtureRecord
9178
if assert.NoError(t, err) {
92-
count, err := db.GetClient().Count(index).Do(context.Background())
79+
count, err := db.GetClient().Count(record.Index).Do(context.Background())
9380
if assert.NoError(t, err) {
9481
assert.Equal(t, int64(1), count)
9582
}
96-
res, err := db.GetClient().Get().Index(index).Type(record.Topic).Id(record.GetId()).Do(context.Background())
83+
res, err := db.GetClient().Get().Index(record.Index).Type(record.Type).Id(record.ID).Do(context.Background())
9784
if assert.NoError(t, err) {
9885
json.Unmarshal(*res.Source, &recordFromES)
9986
}
10087
assert.Equal(t, recordFromES.Id, id)
10188
}
102-
db.GetClient().DeleteByQuery(index).Query(elastic.MatchAllQuery{}).Do(context.Background())
89+
db.GetClient().DeleteByQuery(record.Index).Query(elastic.MatchAllQuery{}).Do(context.Background())
10390
}
10491

10592
func TestRecordDatabase_Insert_Multiple(t *testing.T) {
106-
now := time.Now()
107-
record, id, _ := fixtures.NewRecord(now)
108-
index := fmt.Sprintf("%s-%s", config.Index, record.FormatTimestamp())
109-
err := db.Insert([]*models.Record{record, record})
93+
record, id := fixtures.NewElasticRecord()
94+
err := db.Insert([]*models.ElasticRecord{record, record})
11095
db.GetClient().Refresh("_all").Do(context.Background())
11196
var recordFromES fixtures.FixtureRecord
11297
if assert.NoError(t, err) {
113-
count, err := db.GetClient().Count(index).Do(context.Background())
98+
count, err := db.GetClient().Count(record.Index).Do(context.Background())
11499
if assert.NoError(t, err) {
115100
assert.Equal(t, int64(1), count)
116101
}
117-
res, err := db.GetClient().Get().Index(index).Type(record.Topic).Id(record.GetId()).Do(context.Background())
102+
res, err := db.GetClient().Get().Index(record.Index).Type(record.Type).Id(record.ID).Do(context.Background())
118103
if assert.NoError(t, err) {
119104
json.Unmarshal(*res.Source, &recordFromES)
120105
}
121106
assert.Equal(t, recordFromES.Id, id)
122107
}
123-
db.GetClient().DeleteByQuery(index).Query(elastic.MatchAllQuery{}).Do(context.Background())
124-
}
125-
126-
func TestRecordDatabase_Insert_IndexColumnBlacklist(t *testing.T) {
127-
now := time.Now()
128-
record, id, value := fixtures.NewRecord(now)
129-
index := fmt.Sprintf("%s-%d", config.Index, id)
130-
err := dbIndexColumnBlacklist.Insert([]*models.Record{record})
131-
dbIndexColumnBlacklist.GetClient().Refresh("_all").Do(context.Background())
132-
var recordFromES fixtures.FixtureRecord
133-
if assert.NoError(t, err) {
134-
count, err := dbIndexColumnBlacklist.GetClient().Count(index).Do(context.Background())
135-
if assert.NoError(t, err) {
136-
assert.Equal(t, int64(1), count)
137-
}
138-
res, err := dbIndexColumnBlacklist.GetClient().Get().Index(index).Type(record.Topic).Id(record.GetId()).Do(context.Background())
139-
if assert.NoError(t, err) {
140-
json.Unmarshal(*res.Source, &recordFromES)
141-
}
142-
assert.Empty(t, recordFromES.Id)
143-
assert.Equal(t, value, recordFromES.Value)
144-
}
145-
dbIndexColumnBlacklist.GetClient().DeleteByQuery(index).Query(elastic.MatchAllQuery{}).Do(context.Background())
108+
db.GetClient().DeleteByQuery(record.Index).Query(elastic.MatchAllQuery{}).Do(context.Background())
146109
}
147110

148111
func setupDB(d RecordDatabase) {

0 commit comments

Comments
 (0)