Skip to content
This repository was archived by the owner on Oct 7, 2022. It is now read-only.

Commit 2941dcb

Browse files
authored
Merge pull request #10 from inloco/feature/hour-index
feat(codec): add hour index option
2 parents 7d171d8 + 4e0062b commit 2941dcb

File tree

7 files changed

+124
-11
lines changed

7 files changed

+124
-11
lines changed

Gopkg.lock

Lines changed: 76 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ To create new injectors for your topics, you should create a new kubernetes depl
3030
- `METRICS_PORT` Port to export app metrics **REQUIRED**
3131
- `ES_BULK_TIMEOUT` Timeout for elasticsearch bulk writes in the format of golang's `time.ParseDuration`. Default value is 1s **OPTIONAL**
3232
- `ES_BULK_BACKOFF` Constant backoff when elasticsearch is overloaded. in the format of golang's `time.ParseDuration`. Default value is 1s **OPTIONAL**
33+
- `ES_TIME_SUFFIX` Indicates what time unit to append to index names on elasticsearch. Supported values are `day` and `hour`. Default value is `day` **OPTIONAL**
3334
- `KAFKA_CONSUMER_RECORD_TYPE` Kafka record type. Should be set to "avro" or "json". Defaults to avro. **OPTIONAL**
3435
- `KAFKA_CONSUMER_METRICS_UPDATE_INTERVAL` The interval which the app updates the exported metrics in the format of golang's `time.ParseDuration`. Defaults to 30s. **OPTIONAL**
3536

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
0.5.3
1+
0.6.0

src/elasticsearch/codec.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,10 @@ func (c basicCodec) getDatabaseIndex(record *models.Record) (string, error) {
5252
}
5353

5454
indexColumn := c.config.IndexColumn
55-
indexSuffix := record.FormatTimestamp()
55+
indexSuffix := record.FormatTimestampDay()
56+
if c.config.TimeSuffix == TimeSuffixHour {
57+
indexSuffix = record.FormatTimestampHour()
58+
}
5659
if indexColumn != "" {
5760
newIndexSuffix, err := record.GetValueForField(indexColumn)
5861
if err != nil {

src/elasticsearch/codec_test.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,27 @@ func TestCodec_EncodeElasticRecords(t *testing.T) {
2525
elasticRecords, err := codec.EncodeElasticRecords([]*models.Record{record})
2626
if assert.NoError(t, err) && assert.Len(t, elasticRecords, 1) {
2727
elasticRecord := elasticRecords[0]
28-
assert.Equal(t, fmt.Sprintf("%s-%s", record.Topic, record.FormatTimestamp()), elasticRecord.Index)
28+
assert.Equal(t, fmt.Sprintf("%s-%s", record.Topic, record.FormatTimestampDay()), elasticRecord.Index)
29+
assert.Equal(t, record.Topic, elasticRecord.Type)
30+
assert.Equal(t, fmt.Sprintf("%d:%d", record.Partition, record.Offset), elasticRecord.ID)
31+
assert.Equal(t, id, elasticRecord.Json["id"])
32+
assert.Equal(t, value, elasticRecord.Json["value"])
33+
}
34+
}
35+
36+
func TestCodec_EncodeElasticRecordsHourSuffix(t *testing.T) {
37+
codec := &basicCodec{
38+
config: Config{
39+
TimeSuffix: TimeSuffixHour,
40+
},
41+
logger: codecLogger,
42+
}
43+
record, id, value := fixtures.NewRecord(time.Now())
44+
45+
elasticRecords, err := codec.EncodeElasticRecords([]*models.Record{record})
46+
if assert.NoError(t, err) && assert.Len(t, elasticRecords, 1) {
47+
elasticRecord := elasticRecords[0]
48+
assert.Equal(t, fmt.Sprintf("%s-%s", record.Topic, record.FormatTimestampHour()), elasticRecord.Index)
2949
assert.Equal(t, record.Topic, elasticRecord.Type)
3050
assert.Equal(t, fmt.Sprintf("%d:%d", record.Partition, record.Offset), elasticRecord.ID)
3151
assert.Equal(t, id, elasticRecord.Json["id"])

src/elasticsearch/config.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,13 @@ import (
66
"time"
77
)
88

9+
type TimeIndexSuffix int
10+
11+
const (
12+
TimeSuffixDay TimeIndexSuffix = 0
13+
TimeSuffixHour TimeIndexSuffix = 1
14+
)
15+
916
type Config struct {
1017
Host string
1118
Index string
@@ -14,6 +21,7 @@ type Config struct {
1421
BlacklistedColumns []string
1522
BulkTimeout time.Duration
1623
Backoff time.Duration
24+
TimeSuffix TimeIndexSuffix
1725
}
1826

1927
func NewConfig() Config {
@@ -33,6 +41,13 @@ func NewConfig() Config {
3341
backoff = d
3442
}
3543
}
44+
timeSuffix := TimeSuffixDay
45+
if suffix := os.Getenv("ES_TIME_SUFFIX"); suffix != "" {
46+
switch suffix {
47+
case "hour":
48+
timeSuffix = TimeSuffixHour
49+
}
50+
}
3651
return Config{
3752
Host: os.Getenv("ELASTICSEARCH_HOST"),
3853
Index: os.Getenv("ES_INDEX"),
@@ -41,5 +56,6 @@ func NewConfig() Config {
4156
BlacklistedColumns: strings.Split(os.Getenv("ES_BLACKLISTED_COLUMNS"), ","),
4257
BulkTimeout: timeout,
4358
Backoff: backoff,
59+
TimeSuffix: timeSuffix,
4460
}
4561
}

src/models/record.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,14 @@ type Record struct {
1515
Json map[string]interface{}
1616
}
1717

18-
func (r *Record) FormatTimestamp() string {
18+
func (r *Record) FormatTimestampDay() string {
1919
return r.Timestamp.Format("2006-01-02")
2020
}
2121

22+
func (r *Record) FormatTimestampHour() string {
23+
return r.Timestamp.Format("2006-01-02T15")
24+
}
25+
2226
func (r *Record) GetId() string {
2327
return fmt.Sprintf("%d:%d", r.Partition, r.Offset)
2428
}

0 commit comments

Comments
 (0)